In [None]:
!pip install gym[atari]
!pip install autorom[accept-rom-license]
!pip install highway-env

In [None]:
import highway_env
import gymnasium as gym
import sys
import pickle
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import namedtuple
import numpy as np
from collections import deque
from torch.autograd import Variable
import torch.nn.functional as F
import tqdm
import os
import cv2
import torch.distributions as distributions
from torch.distributions import Normal
from torch.distributions import MultivariateNormal
from torch.distributions import Categorical

In [None]:
env = gym.make("parking-v0", render_mode="rgb_array")
# env.unwrapped.config['add_walls']=False
env.unwrapped.config['duration']=40
obs,_=env.reset(seed=0)
goal=obs['desired_goal'] #x,y,vx,vy,cos h ,sin h
print(goal)
env.close()
env = gym.make("parking-v0", render_mode="rgb_array")
# env.unwrapped.config['add_walls']=False
env.unwrapped.config['duration']=20
delta_x=0.03
delta_y=0.03

In [None]:
def get_q_value(feature,weights,state, action):
        features = feature(state)
        w_a=weights[action]
        q_value = np.dot(w_a, features.T)
        return q_value

In [None]:
def arg_max(lst):
    max_val = max(lst)
    max_indices = [i for i, x in enumerate(lst) if x == max_val]
    return random.choice(max_indices)

In [None]:
def car_e_greedy(feature,state,weights,epsilon):
    probability= np.random.random()
    if probability<=epsilon:
        action=np.random.choice(num_of_actions)
    else:
        q_values=[]
        for a in range(num_of_actions):
            q_values.append(get_q_value(feature,weights,state,a))
        action= arg_max(q_values)
    return action

In [None]:
min_values = np.array([-1.5, -0.75])
max_values = np.array([4.5, 2.25])

# Define the number of bins for each dimension
num_bins = np.array([500, 500])

# Compute the bin widths for each dimension
bin_widths = (max_values - min_values) / num_bins
# print(bin_widths)
def discretize_state(state):

    # Map each dimension to its corresponding bin
    discrete_state = ((state - min_values) / bin_widths).astype(int)
    # Ensure the state falls within the valid range
    discrete_state = np.clip(discrete_state, 0, num_bins - 1)
    return tuple(discrete_state)

In [None]:
def feature_x(state):
   state=discretize_state(state)
   row,col=state[0],state[1]
   num_cols=500
   idx=row * num_cols + col
   features=np.zeros((num_cols*num_cols))
   features[idx]=1
   return features

In [None]:
def stepper(current_speeds,action):
  if action==0:
    current_speeds += np.array([0.2 ,0.0])
  elif action==1:
    current_speeds += np.array([-0.2, 0.0])
  elif action==2:
    current_speeds += np.array([0.0, 0.2])
  elif action==3:
    current_speeds += np.array([0.0, -0.2])
  current_speeds=np.clip(current_speeds, -1.0, 1.0)
  return np.array(current_speeds)


In [None]:
max_iter=5000
alpha=0.1
gamma=0.99
feature=feature_x
actions=np.array([0,1,2,3])
epsilon=0.99
num_of_actions=actions.shape[0]
w=np.zeros((num_of_actions,500*500))
goal_x,goal_y=goal[0:2]
e_t=[]
time_step=0
rewards_list=[]
rendered_frames=[]
lengths_list=[]

pbar=tqdm.trange(max_iter)
for episode in pbar:
  current_speeds=np.array([0.0 ,0.0])
  state,_=env.reset(seed=0)
  s=state['observation'][0:2]

  s=discretize_state(s)
  time_step +=1
  a=car_e_greedy(feature,s,w,epsilon)
  t=0
  current_ep_reward=0
  while(True):
      current_speeds= stepper(current_speeds,a)
      s_prime,r,done,truncated,info=env.step(current_speeds)
      s_prime=s_prime['observation'][0:2]

      s_prime=discretize_state(s_prime)
      current_ep_reward +=r
      time_step +=1

      if time_step % 200==0:
            epsilon =max(0.01, epsilon-0.001)

      if info['is_success'] and done:
        print("\n Episode Complete \n ")

      if done or truncated:
          if info['is_success']:
            r+= 25
          w[a]+= alpha*(r - get_q_value(feature,w,s,a))*feature(s)
          e_t.append((episode,t))
          pbar.set_description(
                f'Episode: {episode} | Steps: {t + 1} | Return: {current_ep_reward:5.2f} |Epsilon: {epsilon}  '
      )
          rewards_list.append(current_ep_reward)
          lengths_list.append((t+1))
          break


      else:

          a_prime=car_e_greedy(feature,s_prime,w,epsilon)
          w[a]+=alpha*(r+gamma*get_q_value(feature,w,s_prime,a_prime)-get_q_value(feature,w,s,a))*feature(s)
          s=s_prime
          a=a_prime
          t+=1

      if episode%499==0:
        rendered_frames.append(env.render())



In [None]:
def moving_average(data, *, window_size = 50):
    """Smooths 1-D data array using a moving average.

    Args:
        data: 1-D numpy.array
        window_size: Size of the smoothing window

    Returns:
        smooth_data: A 1-d numpy.array with the same size as data
    """
    # assert data.ndim == 1
    kernel = np.ones(window_size)
    smooth_data = np.convolve(data, kernel) / np.convolve(
        np.ones_like(data), kernel
    )
    return smooth_data[: -window_size + 1]

In [None]:
# YOUR PLOTTING CODE HERE
plt.figure(figsize=(10, 6))
# plt.subplot(3, 1, 1)
plt.plot(rewards_list, label='Returns (Raw Data)', alpha=0.5)
plt.plot(moving_average(rewards_list), label='Returns (Moving Average)', color='orange')
plt.title('Returns')
plt.xlabel('Episode')
plt.ylabel('Return')
plt.legend()
plt.show()
plt.close()

plt.figure(figsize=(10, 6))
plt.plot(lengths_list, label='Lengths (Raw Data)', alpha=0.5)
plt.plot(moving_average(lengths_list), label='Lengths (Moving Average)', color='orange')
plt.title('Lengths')
plt.xlabel('Episode')
plt.ylabel('Length')
plt.legend()
plt.show()
plt.close()

plt.figure(figsize=(10, 6))
plt.plot(PPO_losses, label='Losses (Raw Data)')
plt.plot(moving_average(PPO_losses), label='Losses (Moving Average)', color='orange')
plt.title('Losses')
plt.xlabel('Batch')
plt.ylabel('Loss')
plt.legend()
plt.show()