SARSA

In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt 

env = gym.make("MountainCar-v0")

DISCRETE_BUCKETS = 20
EPISODES = 30000
DISCOUNT = 0.95
EPISODE_DISPLAY = 500
LEARNING_RATE = 0.1
EPSILON = 0.5
EPSILON_DECREMENTER = EPSILON/(EPISODES//4)

#Q-Table of size DISCRETE_BUCKETS*DISCRETE_BUCKETS*env.action_space.n
Q_TABLE = np.random.randn(DISCRETE_BUCKETS,DISCRETE_BUCKETS,env.action_space.n)

#Performance measures
ep_rewards = []
ep_rewards_table = {'ep': [], 'avg': [], 'min': [], 'max': []}

def discretized_state(state):
	DISCRETE_WIN_SIZE = (env.observation_space.high-env.observation_space.low)/[DISCRETE_BUCKETS]*len(env.observation_space.high)
	discrete_state = (state-env.observation_space.low)//DISCRETE_WIN_SIZE
	return tuple(discrete_state.astype(np.int))		#integer tuple for extracting Q table values

for episode in range(EPISODES):
	episode_reward = 0
	done = False

	if episode % EPISODE_DISPLAY == 0:
		render_state = True
	else:
		render_state = False

	curr_discrete_state = discretized_state(env.reset())	#initialize new state
	if np.random.random() > EPSILON:
		action = np.argmax(Q_TABLE[curr_discrete_state])	# action selection from Q_Table // exploitation
	else:
		action = np.random.randint(0, env.action_space.n)	# Epsilon random exploration 

	while not done:		
		new_state, reward, done, _ = env.step(action)
		new_discrete_state = discretized_state(new_state)

		if np.random.random() > EPSILON:
			new_action = np.argmax(Q_TABLE[new_discrete_state])
		else:
			new_action = np.random.randint(0, env.action_space.n)

		if render_state:
			env.render()

		if not done:
			current_q = Q_TABLE[curr_discrete_state+(action,)]	# Q(S,A)			
			max_future_q = Q_TABLE[new_discrete_state+(new_action,)]	# maxQ(S,A)
			new_q = current_q + LEARNING_RATE*(reward+DISCOUNT*max_future_q-current_q)	# Q(S,A) <-- Q(S,A) + alpha[R + gamma*maxQ(S,A) - Q(S,A)]
			Q_TABLE[curr_discrete_state+(action,)]=new_q	# storing new_q into Q_Table
		elif new_state[0] >= env.goal_position:		#win condition
			Q_TABLE[curr_discrete_state + (action,)] = 0

		curr_discrete_state = new_discrete_state	# S<-S'
		action = new_action	# A<-A'

		episode_reward += reward	# reward tracker

	EPSILON = EPSILON - EPSILON_DECREMENTER

	ep_rewards.append(episode_reward)

	if not episode % EPISODE_DISPLAY:
		avg_reward = sum(ep_rewards[-EPISODE_DISPLAY:])/len(ep_rewards[-EPISODE_DISPLAY:])
		ep_rewards_table['ep'].append(episode)
		ep_rewards_table['avg'].append(avg_reward)
		ep_rewards_table['min'].append(min(ep_rewards[-EPISODE_DISPLAY:]))
		ep_rewards_table['max'].append(max(ep_rewards[-EPISODE_DISPLAY:]))
		
		print(f"Episode:{episode} avg:{avg_reward} min:{min(ep_rewards[-EPISODE_DISPLAY:])} max:{max(ep_rewards[-EPISODE_DISPLAY:])}")

env.close()

plt.plot(ep_rewards_table['ep'], ep_rewards_table['avg'], label="avg")
plt.plot(ep_rewards_table['ep'], ep_rewards_table['min'], label="min")
plt.plot(ep_rewards_table['ep'], ep_rewards_table['max'], label="max")
plt.legend(loc=4) 
plt.title('Mountain Car SARSA')
plt.ylabel('Average reward/Episode')
plt.xlabel('Episodes')
plt.show()


Dyna-Q

In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt

env = gym.make("MountainCar-v0")

DISCRETE_BUCKETS = 20
EPISODES = 30000
DISCOUNT = 0.95
EPISODE_DISPLAY = 500
LEARNING_RATE = 0.1
EPSILON = 0.5
EPSILON_DECREMENTER = EPSILON/(EPISODES//4)
DYNA_N = 10

#Q-Table of size DISCRETE_BUCKETS*DISCRETE_BUCKETS*env.action_space.n
Q_TABLE = np.ones((DISCRETE_BUCKETS,DISCRETE_BUCKETS,env.action_space.n))/(-100)
Model_transitions = np.ones((DISCRETE_BUCKETS*DISCRETE_BUCKETS,env.action_space.n))/(-100)
Model_rewards = np.ones((DISCRETE_BUCKETS*DISCRETE_BUCKETS,env.action_space.n))/(-100)
start_dyna = False

# For stats
ep_rewards = []
ep_rewards_table = {'ep': [], 'avg': [], 'min': [], 'max': []}

def discretized_state(state):
	DISCRETE_WIN_SIZE = (env.observation_space.high-env.observation_space.low)/[DISCRETE_BUCKETS]*len(env.observation_space.high)
	discrete_state = (state-env.observation_space.low)//DISCRETE_WIN_SIZE
	return tuple(discrete_state.astype(np.int))		#integer tuple for extracting Q table values

def flatten_state(state):
	return state[0]*20+state[1]

def unflatten_state(state):
	return (int(state//20) , int(state%20))

for episode in range(EPISODES):
	episode_reward = 0
	done = False

	curr_discrete_state = discretized_state(env.reset())

	if episode % EPISODE_DISPLAY == 0:
		render_state = True
	else:
		render_state = False

	while not done:
		if np.random.random() > EPSILON:
			action = np.argmax(Q_TABLE[curr_discrete_state]) # action selection from Q_Table // exploitation
		else:
			action = np.random.randint(0, env.action_space.n) # Epsilon random exploration 
		
		new_state, reward, done, _ = env.step(action)
		new_discrete_state = discretized_state(new_state)
		
		#if render_state:
			#env.render()

		if not done:
			max_future_q = np.max(Q_TABLE[new_discrete_state]) # maxQ(S,A)			
			current_q = Q_TABLE[curr_discrete_state+(action,)]  # Q(S,A)
			new_q = current_q + LEARNING_RATE*(reward + DISCOUNT*max_future_q - current_q) # Q(S,A) <-- Q(S,A) + alpha[R + gamma*maxQ(S,A) - Q(S,A)]
			Q_TABLE[curr_discrete_state+(action,)]=new_q  # storing new_q into Q_Table

			flattened_curr = flatten_state(curr_discrete_state) # flatten from 20,20,3 into 400,3
			flattened_new = flatten_state(new_discrete_state)
			Model_transitions[flattened_curr][action]=flattened_new # Model(s,a) <-- s',R
			Model_rewards[flattened_curr][action]=reward

			dyna_count=0
			while(dyna_count<DYNA_N & start_dyna):
				# Randomly select visited state and action				
				state_sample = np.random.choice(np.where(np.sum(Model_transitions, axis=1)>0)[0])
				state_sample_unflat = unflatten_state(state_sample)
				action_sample = np.random.choice(np.where(Model_rewards[state_sample]<=0)[0])

				new_discrete_state_flat = Model_transitions[state_sample][action_sample] # S', R <- Model (s,a)
				reward = Model_rewards[state_sample][action_sample]

				new_discrete_state = unflatten_state(new_discrete_state_flat) #changing single value into int tuple

				max_future_q = np.max(Q_TABLE[new_discrete_state]) # maxQ(S',A)
				current_q = Q_TABLE[state_sample_unflat+(action_sample,)]  # Q(S,A)
				new_q = current_q + LEARNING_RATE*(reward + DISCOUNT*max_future_q - current_q) # Q(S,A) <-- Q(S,A) + alpha[R + gamma*maxQ(S',A) - Q(S,A)]

				Q_TABLE[state_sample_unflat+(action_sample,)] = new_q	# storing new_q into Q_Table
				dyna_count+=1
				
		elif new_state[0] >= env.goal_position:  # win condition
			Q_TABLE[curr_discrete_state + (action,)] = 0   
			start_dyna=True

		curr_discrete_state = new_discrete_state # S <-- S'
		episode_reward += reward # reward tracker

	EPSILON = EPSILON - EPSILON_DECREMENTER 

	ep_rewards.append(episode_reward)

	if not episode % EPISODE_DISPLAY:
		avg_reward = sum(ep_rewards[-EPISODE_DISPLAY:])/len(ep_rewards[-EPISODE_DISPLAY:])
		ep_rewards_table['ep'].append(episode)
		ep_rewards_table['avg'].append(avg_reward)
		ep_rewards_table['min'].append(min(ep_rewards[-EPISODE_DISPLAY:]))
		ep_rewards_table['max'].append(max(ep_rewards[-EPISODE_DISPLAY:]))
		
		print(f"Episode:{episode} avg:{avg_reward} min:{min(ep_rewards[-EPISODE_DISPLAY:])} max:{max(ep_rewards[-EPISODE_DISPLAY:])}")

env.close()

plt.plot(ep_rewards_table['ep'], ep_rewards_table['avg'], label="avg")
plt.plot(ep_rewards_table['ep'], ep_rewards_table['min'], label="min")
plt.plot(ep_rewards_table['ep'], ep_rewards_table['max'], label="max")
plt.legend(loc=4)
plt.title('Mountain Car Dyna-Q')
plt.ylabel('Average reward/Episode')
plt.xlabel('Episodes')
plt.show()

Dyna-SARSA

In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt 

env = gym.make("MountainCar-v0")

#Environment values
print(env.observation_space.high)	#[0.6  0.07]
print(env.observation_space.low)	#[-1.2  -0.07]
print(env.action_space.n)			#3

DISCRETE_BUCKETS = 20
EPISODES = 30000
DISCOUNT = 0.95
EPISODE_DISPLAY = 500
LEARNING_RATE = 0.1
EPSILON = 0.5
EPSILON_DECREMENTER = EPSILON/(EPISODES//4)
DYNA_N = 10

Q_TABLE = np.ones((DISCRETE_BUCKETS,DISCRETE_BUCKETS,env.action_space.n))/(-100)
Model_transitions = np.ones((DISCRETE_BUCKETS*DISCRETE_BUCKETS,env.action_space.n))/(-100)
Model_rewards = np.ones((DISCRETE_BUCKETS*DISCRETE_BUCKETS,env.action_space.n))/(-100)
start_dyna = False

# For stats
ep_rewards = []
ep_rewards_table = {'ep': [], 'avg': [], 'min': [], 'max': []}

def discretised_state(state):
	DISCRETE_WIN_SIZE = (env.observation_space.high-env.observation_space.low)/[DISCRETE_BUCKETS]*len(env.observation_space.high)
	discrete_state = (state-env.observation_space.low)//DISCRETE_WIN_SIZE
	return tuple(discrete_state.astype(np.int))		#integer tuple for extracting Q table values

def flatten_state(state):
	return state[0]*20+state[1]

def unflatten_state(state):
	return (int(state//20) , int(state%20))

for episode in range(EPISODES):
	episode_reward = 0
	done = False

	if episode % EPISODE_DISPLAY == 0:
		render_state = True
	else:
		render_state = False

	curr_discrete_state = discretised_state(env.reset())
	if np.random.random() > EPSILON:
		action = np.argmax(Q_TABLE[curr_discrete_state])    # action selection from Q_Table // exploitation
	else:
		action = np.random.randint(0, env.action_space.n)   # Epsilon random exploration

	while not done:		# new state and action chosen
		new_state, reward, done, _ = env.step(action)
		new_discrete_state = discretised_state(new_state)

		if np.random.random() > EPSILON:
			new_action = np.argmax(Q_TABLE[new_discrete_state])  # new action selection from Q_Table // exploitation
		else:
			new_action = np.random.randint(0, env.action_space.n)   # Epsilon random exploration

		#if render_state:
			#env.render()

		if not done:
			current_q = Q_TABLE[curr_discrete_state+(action,)]			# Q(S,A)
			max_future_q = Q_TABLE[new_discrete_state+(new_action,)]    # maxQ(S,A)
			new_q = current_q + LEARNING_RATE*(reward+DISCOUNT*max_future_q-current_q)  # Q(S,A) <-- Q(S,A) + alpha[R + gamma*maxQ(S,A) - Q(S,A)]
			Q_TABLE[curr_discrete_state+(action,)]=new_q    # storing new_q into Q_Table
			
			flattened_curr = flatten_state(curr_discrete_state) # flatten from 20,20,3 into 400,3
			flattened_new = flatten_state(new_discrete_state)
			Model_transitions[flattened_curr][action]=flattened_new # Model(s,a) <-- s',R
			Model_rewards[flattened_curr][action]=reward
			dyna_count=0
			while(dyna_count<DYNA_N & start_dyna):
				# Randomly select visited state and action				
				state_sample = np.random.choice(np.where(np.sum(Model_transitions, axis=1)>0)[0])
				state_sample_unflat = unflatten_state(state_sample)
				action_sample = np.random.choice(np.where(Model_rewards[state_sample]<=0)[0])

				new_discrete_state_flat = Model_transitions[state_sample][action_sample] # s', R <- Model (s,a)
				reward = Model_rewards[state_sample][action_sample]

				new_discrete_state = unflatten_state(new_discrete_state_flat) #changing single value into int tuple
				current_q = Q_TABLE[state_sample_unflat+(action_sample,)]  # Q(S,A)
				max_future_q = np.max(Q_TABLE[new_discrete_state]) # maxQ(S,A)
				new_q = current_q + LEARNING_RATE*(reward + DISCOUNT*max_future_q - current_q) # Q(S,A) <-- Q(S,A) + alpha[R + gamma*maxQ(S',A') - Q(S,A)]
				Q_TABLE[state_sample_unflat+(action_sample,)] = new_q   # storing new_q into Q_Table
				dyna_count+=1

		elif new_state[0] >= env.goal_position:     # win condition
			Q_TABLE[curr_discrete_state + (action,)] = 0   # 0 instead of the -1
			start_dyna=True

		curr_discrete_state = new_discrete_state    # S<-S'
		action = new_action # A<-A'

		episode_reward += reward    # reward tracker

	EPSILON = EPSILON - EPSILON_DECREMENTER

	ep_rewards.append(episode_reward)

	if not episode % EPISODE_DISPLAY:
		avg_reward = sum(ep_rewards[-EPISODE_DISPLAY:])/len(ep_rewards[-EPISODE_DISPLAY:])
		ep_rewards_table['ep'].append(episode)
		ep_rewards_table['avg'].append(avg_reward)
		ep_rewards_table['min'].append(min(ep_rewards[-EPISODE_DISPLAY:]))
		ep_rewards_table['max'].append(max(ep_rewards[-EPISODE_DISPLAY:]))
		
		print(f"Episode:{episode} avg:{avg_reward} min:{min(ep_rewards[-EPISODE_DISPLAY:])} max:{max(ep_rewards[-EPISODE_DISPLAY:])}")

env.close()

plt.plot(ep_rewards_table['ep'], ep_rewards_table['avg'], label="avg")
plt.plot(ep_rewards_table['ep'], ep_rewards_table['min'], label="min")
plt.plot(ep_rewards_table['ep'], ep_rewards_table['max'], label="max")
plt.legend(loc=4) 
plt.title('Mountain Car Dyna-SARSA')
plt.ylabel('Average reward/Episode')
plt.xlabel('Episodes')
plt.show()