In [7]:
import requests
import gymnasium as gym
import numpy as np
from gymnasium import spaces
import gymnasium as gym
from stable_baselines3 import DQN
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
import time
from stable_baselines3.common.callbacks import BaseCallback
import urllib3
import os

In [16]:
# Set up the logger
log_filename = "maze_agent_run.log"  # Log file name

try:
    if os.path.exists(log_filename):
        os.remove(log_filename)
except:
    pass
# Set the logging level for urllib3 to WARNING
urllib3_logger = logging.getLogger("urllib3")
urllib3_logger.setLevel(logging.INFO)

logging.basicConfig(
    level=logging.DEBUG,  # Set the logging level
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",  # Format for log messages
    handlers=[
        logging.FileHandler(log_filename),  # Log to a file
        logging.StreamHandler()  # Log to the console
    ]
    
)

def get_info(response):
    if response.status_code != 200:
        print("error code at response ")
        
    done = response.json().get('done') if not None else False
    info = response.json().get('info') if not None else ""
    observation = response.json().get('observation')  
    observation = np.array(observation, dtype=np.int32)
    
    reward = response.json().get('reward') 
    trunc = response.json().get('trunc') if not None else False
    
    
    return done,info,observation,reward,trunc
    
# implement retry policy
@retry(stop=stop_after_attempt(5),wait=wait_exponential(multiplier=1,min=4,max=10))
def make_request(url,headers,data=None):
    if data:
        response = requests.post(url,headers=headers,json=data)
    else:
        response = requests.post(url,headers=headers)
    #raise http error for bad responses
    response.raise_for_status()
    #action = url.split("/")[3]
    
    
    logging.info(f"Action:{url} data {data}, response: {response.status_code}")
    #logging.info(f"request made :{url}, data:{data}, response: {response.status_code}")
    return response




In [17]:
api_new_game = "http://18.185.60.20:5005/new_game"
headers = {'Content-Type': 'application/json'}

# Start new game

response = make_request(url = api_new_game, headers=headers)
uuid = response.json().get('uuid')


step = "http://18.185.60.20:5005/step/"+uuid  
reset = "http://18.185.60.20:5005/reset"+uuid

response = make_request(reset,headers)


2024-09-10 19:05:47,678 - root - INFO - Action:http://18.185.60.20:5005/new_game data None, response: 200


RetryError: RetryError[<Future at 0x1dde02ae560 state=finished raised HTTPError>]

In [14]:
action_seq = [2,1,3,1,1,1,1,1,2,2,2]


for action in action_seq:
    
    content = {'action': int(action)}
    response = requests.post(step,headers=headers,json=content)
    done,info,observation,reward,trunc = get_info(response)
    print(f"Move: {action}, obs:{observation}, reward:{reward}, done: {done}")
    #plot_maze_position(observation)
    #time.sleep(1)
        

done,info,observation,reward,trunc = get_info(response)
print("-------------")
print(done)
print(observation)
print(reward)
print(trunc)


reseult = requests.post(reset,headers=headers)
done,info,observation,reward,trunc = get_info(response)
print(observation)

error code at response 


TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'

In [6]:
class TrainingCallback(BaseCallback):
    def __init__(self, verbose=1):
        super(TrainingCallback, self).__init__(verbose)
    
    def _on_step(self) -> bool:
        # Log reward and step details
        if self.n_calls % 10 == 0:  # Log every 10th step
            logging.info(f"Step: {self.n_calls}, Reward: {self.locals['rewards']}")
        return True

    def _on_rollout_end(self) -> None:
        # Called at the end of each rollout (i.e., after each epoch)
        #logging.info(f"End of epoch. Total steps: {self.num_timesteps}")
        logging.info(f"Last rewards: {self.locals['rewards']}")
    



class MazeAPIEnv(gym.Env):
    def __init__(self, api_step_url, headers,api_new_game,uuid):
        super(MazeAPIEnv, self).__init__()
        
        self.uuid=uuid
        self.api_step_url = api_step_url  # URL for the API step endpoint
        self.headers = headers  # Headers for authorization or any other required fields
        self.api_new_game =api_new_game
        

        # Define the action and observation space
        # Define the action space: 0=up, 1=down, 2=right, 3=left
        self.action_space = spaces.Discrete(4)
        
        
        # For multi-dimensional observations, use Box space
        self.observation_space = spaces.Box(low=np.array([0, 0]), high=np.array([9, 9]), dtype=np.int32)

        self.current_state = None  # To hold the current state
        self.done = False
        
    def reset(self,seed=None,**kwargs):
        self.current_state = np.array([0, 0])
        self.done = False
        
        response = make_request(url=self.api_new_game, headers=headers)
        self.uuid = response.json().get('uuid') 
        
        self.api_step_url = self.api_step_url+self.uuid
        return self.current_state, response
        

    def step(self, action):
        # Send the action to the API
        content = {'action': int(action)}
        
        response = make_request(url=self.api_step_url, headers=self.headers, data=content)
    
        if response.status_code !=200 :
            print("error code in step")
        # Extract the response data
        done,info,observation,reward,truncated = get_info(response)
        
        
        # Update current state
        self.current_state = observation
        self.done = done
        

        logging.info(f"Current State = {observation}")      
          
        return observation, reward, done, truncated, info
    
    

# Define the API endpoint and headers
api_step_url = "http://18.185.60.20:5005/step/"  
api_new_game = "http://18.185.60.20:5005/new_game"
headers = {'Content-Type': 'application/json'}

# Start new game
response = make_request(url = api_new_game, headers=headers)
uuid = response.json().get('uuid')

# Instantiate the custom environment
env = MazeAPIEnv(api_step_url, headers,api_new_game,uuid)

# Create a DQN agent using the custom environment
model = DQN('MlpPolicy', env, verbose=2, exploration_fraction=0.2, exploration_final_eps=0.05)


# Train the agent
logging.info("Starting the training process...")
callback = TrainingCallback()


# Train model
model.learn(total_timesteps=1000, callback=callback)


# Save the trained model
model.save("dqn_maze_agent")
logging.info("Saved trained agent")

logging.info("Started evaluation")
# Evaluate the trained agent
num_episodes = 10
total_rewards = []
i = 0
max_steps = 100
for _ in range(num_episodes):
    
    obs,_ = env.reset()
    obs = np.array(obs)
    done = False
    episode_reward = 0
    steps = 0
    
    while not done and steps < max_steps:
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, truncated, info = env.step(action)
        episode_reward += reward
        time.sleep(0.1)
        steps+=1

    total_rewards.append(episode_reward)
    i+=1
    logging.info(f"Evaluation episode:{i}, episode rewards: {episode_reward}")

average_reward = sum(total_rewards) / num_episodes
print(f"Average reward over {num_episodes} episodes: {average_reward}")


2024-09-10 18:56:08,332 - root - INFO - Action:http://18.185.60.20:5005/new_game data None, response: 200


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


2024-09-10 18:56:09,528 - root - INFO - Starting the training process...
2024-09-10 18:56:09,644 - root - INFO - Action:http://18.185.60.20:5005/new_game data None, response: 200


RetryError: RetryError[<Future at 0x1dddfbb3220 state=finished raised HTTPError>]