In [6]:
import tensorflow as tf
import numpy as np


In [7]:
import random
import math
from card import Card
from constants import NEW, LEARNING, RELEARNING, REVIEW

# --- Simulation Parameters ---
WPM_BASE = 50 # Base words per minute for a simulated user
WPM_RANGE = 10 # How much WPM can fluctuate

def simulate_performance(card: Card, seconds_elapsed: int):
    """
    Simulates a user's performance on a flashcard, determining if they remember it
    and what their recall speed (WPM) would be.
    """
    state = card.learn_state
    stability = {NEW: 0.1, LEARNING: 0.3, RELEARNING: 0.5, REVIEW: 0.7}

    # Probability of remembering based on forgetting curve equation
    probability_of_remembering = math.exp(- (seconds_elapsed / (24*60*60)) / stability[state])

    # Simulate WPM based on the card's state
    if state == NEW or state == LEARNING:
        simulated_wpm = random.uniform(WPM_BASE * 0.4, WPM_BASE * 0.8)
    elif state == RELEARNING:
        simulated_wpm = random.uniform(WPM_BASE * 0.7, WPM_BASE * 0.9)
    else: 
        simulated_wpm = random.uniform(WPM_BASE * 0.8, WPM_BASE * 1.1)
        
    # Introduce randomness to the WPM
    simulated_wpm += random.uniform(-WPM_RANGE, WPM_RANGE)
    simulated_wpm = int(max(10, simulated_wpm)) # Ensure WPM doesn't go below 10

    # Determine the outcome
    remembers = random.random() < probability_of_remembering

    return remembers, simulated_wpm

In [8]:
class Environment:
    def __init__(self):
        self.card = Card('question', 'answer', 0)

    def reset(self):
        self.card = Card('question', 'answer', 0)
        return np.array([self.card.learn_state / 3, 0, 0])

    def step(self, action):
        # Simulate taking a step in the environment

        interval_length = [
            600,    # 10 minutes
            86400,  # 1 day
            259200, # 3 days
            604800, # 1 week
            2592000,# 1 month
            7776000 # 3 months
        ]
        # action 0: review again in 10 minutes
        remembers, wpm = simulate_performance(self.card, interval_length[action])

        self.card.review(wpm, remembers)

        #next state, reward, done, info
        next_state = np.array([
            self.card.learn_state / 3,
            self.card.wpm / self.card.max_wpm if self.card.max_wpm > 0 else 0,
            interval_length[action] / 7776000  # Normalize interval length to a range of 0 to 1
        ])

        if remembers:
            reward = wpm/self.card.max_wpm if self.card.max_wpm > 0 else 0
            reward += interval_length[action] / 7776000
        else:
            reward = -1
        done = False if action != 5 or not remembers or reward < 0.5 else True
        info = None
        return next_state, reward, done, info

In [9]:
env = Environment()
input_shape = [3]  # State space normalized WPM, learning state, 
n_outputs = 6  # Action space (10 min, 1 day, 3 days, 1 week, 1 month, 3 months)

model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(shape=input_shape),
    tf.keras.layers.Dense(32, activation='elu'),
    tf.keras.layers.Dense(32, activation='elu'),
    tf.keras.layers.Dense(32, activation='elu'),
    tf.keras.layers.Dense(n_outputs)
])

In [10]:
def epsilon_greedy_action(state, epsilon):
    if np.random.rand() < epsilon:
        return np.random.randint(0, n_outputs)
    else:
        Q_values = model.predict(np.array(state[np.newaxis]))
        return np.argmax(Q_values)

In [11]:
from collections import deque

replay_buffer = deque(maxlen=10000)

def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_buffer), size=batch_size)
    batch = [replay_buffer[i] for i in indices]
    states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
    return states, actions, rewards, next_states, dones

In [12]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_action(state, epsilon)
    next_state, reward, done, _ = env.step(action)
    replay_buffer.append((state, action, reward, next_state, done))
    return next_state, reward, done

In [13]:
batch_size = 32
discount_factor = 0.90
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
loss_fn = tf.keras.losses.MeanSquaredError()

def train_step(batch_size):
    states, actions, rewards, next_states, dones = sample_experiences(batch_size)
    next_Q_values = model.predict(next_states)
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = rewards + (1 - dones) * discount_factor * max_next_Q_values

    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        Q_values = model(states)
        Q_values = tf.reduce_sum(Q_values * mask, axis=1)
        loss = loss_fn(target_Q_values, Q_values)
        print(loss)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [14]:
for episode in range(500):
    # obs = env.reset()
    obs = env.reset()
    epsilon = max(1 - episode / 500, 0.01)
    for step in range(50):
        obs, reward, done = play_one_step(env, obs, epsilon)
        if done:
            break
    if episode >= 50:
        train_step(batch_size)
        print('train step')

model.save('spaced_repetition_model.keras')
        

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 94ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 60ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46

In [18]:
# test model
model = tf.keras.models.load_model('spaced_repetition_model.keras')
import datetime
# learn state, wpm normalized, days since last review normalized
test_states = [np.array([1, 1, 1]),
               np.array([0, 1, 0]),
               np.array([1, 0, 1])]

actions = [np.argmax(x) for x in model.predict(np.array(test_states))]
print(actions)
print('hi')





[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 61ms/step
[np.int64(5), np.int64(5), np.int64(2)]
hi
