In [1]:
import random
import numpy as np
from collections import deque
import numpy as np
import pandas as pd
from collections import defaultdict


In [2]:

def epsilon_greedy_search(Epsilon, qtable, state):
    if (random.random() < Epsilon):
        # 0 is 'apply matrix A', 1 is 'apply matrix B'
        # 2 is 'apply matrix C', 3 is 'apply matrix D'
        return random.choice([0, 1, 2, 3])
    else:
        # get the best move for the current state
        return best_move_for_a_state(Q_table=qtable, state=state)
    
# I would like to return the best move for a given state
def best_move_for_a_state(Q_table, state):
    vals = Q_table[str(state)]

    # if we haven't visited this state before, return a random choice of 0, 1, 2, or 3
    if vals==[0, 0, 0, 0]:
        return random.choice([0, 1, 2, 3])
    
    # if we have visited this state before, return the current best choice
    return np.argmax(vals)

# over a given state, return the maximum value of the table for that state
def max_a_prime(Q_table, state):
    return max(Q_table[str(state)])

In [3]:
# B is the inverse of A
A = np.array([[1, 1, 0], [0, 1, 0], [0, 0, 1]])
B = np.array([[1, -1, 0], [0, 1, 0], [0, 0, 1]])

# C is the inverse of D
C = np.array([[1, 0, 0], [0, 1, 1], [0, 0, 1]])
D = np.array([[1, 0, 0], [0, 1, -1], [0, 0, 1]])

# together, A, B, C, and D generate the heisenberg group

In [4]:
def getReward(matrix):
    if (matrix==A).all() or (matrix==B).all() or (matrix==C).all() or (matrix==D).all():
        return 20
    else:
        return -1 + 1/(2 + abs(matrix[0][1]) + abs(matrix[0][2]) + abs(matrix[1][2]))

In [5]:
df = pd.read_csv("heisenberg_data.csv")

In [6]:
def get_next_step(oldObs, action):
    # action is always either 0, 1, 2, or 3
    next_state = []
    if action==0:
        next_state = oldObs @ A
    elif action==1:
        next_state = oldObs @ B
    elif action==2:
        next_state = oldObs @ C
    else:
        next_state = oldObs @ D
    curReward = getReward(next_state)
    done = curReward==20
    return (next_state, curReward, done)
    

In [7]:
# adapted from CS 540 Spring 2023 HW 10
EPISODES = 30000
LEARNING_RATE = .1
DISCOUNT_FACTOR = .99
EPSILON = 1
EPSILON_DECAY = .999

random.seed(1)

# starts with an estimate of zero reward for each state.
# adapted from ChatGPT
# the outer dictionary has keys of the string version of the given array, and 
# values of a dictionary for each of the actions that we could take at that state
Q_table = defaultdict(lambda: [0, 0, 0, 0]) 

episode_reward_record = deque(maxlen=100)

for i in range(EPISODES):
    episode_reward = 0
    done = False
    # choose a random starting row
    # adapted from https://stackoverflow.com/questions/15923826/random-row-selection-in-pandas-dataframe
    cur_row = df.sample(1)
    obs = np.array([
        [1, int(cur_row['val1']), int(cur_row['val2'])], 
        [0, 1, int(cur_row['val3'])], 
        [0, 0, 1]
        ])

    index = 1

    while (not done):
        # perform an epsilon greedy action 
        # Q(s, a) = (1-LEARNING_RATE)Q(s, a) + (LEARNING_RATE)(r + DISCOUNT_FACTOR(max a'(Q(s', a'))))
        action = epsilon_greedy_search(Epsilon=EPSILON, qtable=Q_table, state=obs)

        oldObs = obs
        obs,reward,done = get_next_step(oldObs, action)
        Q_table[str(oldObs)][action] = (1-LEARNING_RATE) * Q_table[str(oldObs)][action] + (LEARNING_RATE) * (reward + DISCOUNT_FACTOR * (max_a_prime(Q_table, obs)))

        episode_reward += reward # update episode reward

        index += 1
        # if we take more than 100 steps, end this iteration early (we are probably not making progress)
        if index > 100:
            done=True

    # decay the epsilon
    EPSILON *= EPSILON_DECAY

    # record the reward for this episode
    episode_reward_record.append(episode_reward) 

    if i%100 ==0 and i>0:
        print("Average reward for the last 100 iterations: " + str(sum(list(episode_reward_record))/100))
        print("epsilon: " + str(EPSILON) )



Average reward for the last 100 iterations: -94.84581157451171
epsilon: 0.9038873549665959
Average reward for the last 100 iterations: -94.44941621299665
epsilon: 0.8178301806491574
Average reward for the last 100 iterations: -92.23202317001227
epsilon: 0.7399663251239436
Average reward for the last 100 iterations: -93.05534316200347
epsilon: 0.6695157201007336
Average reward for the last 100 iterations: -88.70811567379366
epsilon: 0.6057725659163237
Average reward for the last 100 iterations: -86.86443128075612
epsilon: 0.548098260578011
Average reward for the last 100 iterations: -85.90764878882217
epsilon: 0.4959150020176678
Average reward for the last 100 iterations: -82.34057331625243
epsilon: 0.44869999946146477
Average reward for the last 100 iterations: -82.81209912684376
epsilon: 0.4059802359226587
Average reward for the last 100 iterations: -84.80373653089877
epsilon: 0.36732772934619257
Average reward for the last 100 iterations: -85.38248200602844
epsilon: 0.332355244929545

In [17]:
Q_table[str(B@B)]

[19.99999999999999,
 0.0008089479999999982,
 1.4744393101090116,
 0.3702767568863788]

In [9]:
# test with the other dataframe. 
test_df = pd.read_csv("heisenberg_data_test.csv")

In [10]:
test_df
cur_row = test_df.sample(1)

In [11]:
cur_matrix = np.array([[1, int(cur_row['val1']), int(cur_row['val2'])], [0, 1, int(cur_row['val3'])], [0, 0, 1]])

In [12]:
outputs = Q_table[str(cur_matrix)]
if outputs==[0, 0, 0, 0]:
    print("Problem.")
index = np.argmax(outputs)
new_matrix = cur_matrix
if index==0:
    new_matrix = cur_matrix @ A
elif index==1:
    new_matrix = cur_matrix @ B
elif index==2:
    new_matrix = cur_matrix @ C
elif index==3:
    new_matrix = cur_matrix @ D

cur_matrix = new_matrix
new_matrix

array([[ 1, -5, 41],
       [ 0,  1, -8],
       [ 0,  0,  1]])

In [13]:
def matrix_to_num_steps(cur_matrix):
    index = 1
    for i in range(50):
        if (cur_matrix==A).all() or (cur_matrix==B).all() or (cur_matrix==C).all() or (cur_matrix==D).all():
            return i
        outputs = Q_table[str(cur_matrix)]
        print(outputs)
        if outputs==[0, 0, 0, 0]:
            # this is a problem because we haven't seen this state before 
            # in training so we have no idea how to handle it
            print("Problem.")
        index = np.argmax(outputs)
        if index==0:
            cur_matrix = cur_matrix @ A
        elif index==1:
            cur_matrix = cur_matrix @ B
        elif index==2:
            cur_matrix = cur_matrix @ C
        elif index==3:
            cur_matrix = cur_matrix @ D
    return 100

In [14]:
print(B@C@B)

matrix_to_num_steps(B@C@B)

[[ 1 -2 -1]
 [ 0  1  1]
 [ 0  0  1]]
[-0.21680000000000005, -0.25810473428571434, -0.28254638208174604, -0.19046711425327917]
[12.325334272250958, -0.15833333333333335, -0.24144704761904767, -0.17613209000000002]
[-0.1407741666666667, -0.08742500000000002, 18.4429905118514, -0.23431904761904765]
[19.99999999999999, 1.3336181919354897, 6.369154925478906, 0.32662102670568594]


4

In [15]:
mymat = B@B@C@B@D@A@A@C@A
print(mymat)

Q_table[str(mymat)]

# cur_matrix = D@D@B@D@A

# matrix_to_num_steps(B@B@C)

[[1 0 0]
 [0 1 1]
 [0 0 1]]


[0, 0, 0, 0]

In [16]:
B@B@C@B@D@A

array([[ 1, -2,  1],
       [ 0,  1,  0],
       [ 0,  0,  1]])

In [24]:
# adapted from ChatGPT
class CustomDefaultDict(dict):
    def __init__(self, default_factory, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.default_factory = default_factory

    def __missing__(self, key):
        # Compute the default value based on the missing key
        default_value = self.default_factory(key)
        self[key] = default_value  # Cache the default value for future lookups
        return default_value

# Example usage:
def default_value_for_key(key):
    # Define a function that computes the default value based on the key
    val1 = key[0][1]
    val2 = key[0][2]
    val3 = key[1][2]
    cur_matrix = np.array([[1, val1, val2], [0, 1, val3], [0, 0, 1]])
    output1 = getReward(cur_matrix @ A)
    output2 = getReward(cur_matrix @ B)
    output3 = getReward(cur_matrix @ C)
    output4 = getReward(cur_matrix @ D)
    return [output1, output2, output3, output4]

custom_dict = CustomDefaultDict(default_value_for_key)

custom_dict[list(A@A)]

TypeError: unhashable type: 'list'

In [25]:
str(list(A@A))

'[array([1, 2, 0]), array([0, 1, 0]), array([0, 0, 1])]'

In [22]:
list(A)[0][0]

1