<a href="https://colab.research.google.com/github/abhinavarorags/CoolStuff/blob/test/PolicyGradient.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#Neural Net Policy Gradient
import numpy as np
import pandas as pd
import tensorflow as tf
from collections import deque
import random

# Hyperparameters
episodes = 10
learning_rate = 0.001
batch_size = 64
memory_size = 10000


def load_data(file_path, url='https://raw.githubusercontent.com/abhinavarorags/CoolStuff/refs/heads/test/sample_data.csv'):
    # Load dataset
    try:
        data = pd.read_csv(file_path)
    except FileNotFoundError:
        # If file not found, download from URL
        data = pd.read_csv(url)
    # Splitting dataset into training and testing
    train_data = data.iloc[:20000]
    test_data = data.iloc[20000:]
    return train_data, test_data


# Load dataset
file_path = '/mnt/data/sample_data.csv'
train_data, test_data = load_data(file_path)

# Placeholder dataset parameters
state_space = train_data.shape[1] - 1  # Number of features in state (assuming last column is the action/reward)
action_space = 3  # Number of possible actions (Buy Long, Sell Short, Hold)


# Neural Network for Policy Gradient Reinforcement Learning
class PolicyGradient(tf.keras.Model):
    def __init__(self, action_space):
        super(PolicyGradient, self).__init__()
        self.fc1 = tf.keras.layers.Dense(24, activation='relu')
        self.fc2 = tf.keras.layers.Dense(24, activation='relu')
        self.out = tf.keras.layers.Dense(action_space, activation='softmax')

    def call(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        return self.out(x)


# Experience Replay Memory
memory = deque(maxlen=memory_size)


def remember(state, action, reward):
    memory.append((state, action, reward))


def act(model, state):
    state = np.array([state])
    probs = model(state).numpy()[0]
    return np.random.choice(action_space, p=probs)


def train_model(model, optimizer):
    for e in range(episodes):
        sample = train_data.sample()  # Randomly sample a row from training data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        episode_memory = []
        total_reward = 0
        done = False
        while not done:
            action = act(model, state)
            next_sample = train_data.sample()  # Get next state from training data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.1  # Small penalty to discourage holding if unnecessary
            episode_memory.append((state, action, reward))
            state = next_state
            total_reward += reward
            done = np.random.rand() > 0.95  # Randomly ending the episode

        # Update policy
        with tf.GradientTape() as tape:
            loss = 0
            for state, action, reward in episode_memory:
                state = np.array([state])
                probs = model(state)
                action_prob = probs[0, action]
                loss -= tf.math.log(action_prob) * reward  # Negative log likelihood weighted by reward
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward:.2f}")
    print("Training completed.")


def test_model(model, test_episodes=100):
    correct_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}
    wrong_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}

    def test_act(model, state):
        state = np.array([state])
        probs = model(state).numpy()[0]
        return np.argmax(probs)

    for e in range(test_episodes):
        sample = test_data.sample()  # Randomly sample a row from testing data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        done = False
        while not done:
            action = test_act(model, state)
            next_sample = test_data.sample()  # Get next state from testing data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.1  # Small penalty to discourage holding if unnecessary
            done = np.random.rand() > 0.95  # Randomly ending the episode
            action_name = ["Buy Long", "Sell Short", "Hold"][action]
            if reward > 0.7:  # Assuming a reward above 0.7 is a correct prediction
                correct_predictions[action_name] += 1
            else:
                wrong_predictions[action_name] += 1
            state = next_state

    print("Test Data saved with recommendations")
    print("\t\tCorrect Predictions  Wrong Predictions")
    for action in ["Buy Long", "Sell Short", "Hold"]:
        print(f"{action}\t\t{correct_predictions[action]}\t\t\t{wrong_predictions[action]}")


# Load, Train and Test Model
model = PolicyGradient(action_space)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

train_model(model, optimizer)
test_model(model)


Episode 1/10, Total Reward: -1.66
Episode 2/10, Total Reward: -6.88
Episode 3/10, Total Reward: -3.62
Episode 4/10, Total Reward: 2.84
Episode 5/10, Total Reward: -1.02
Episode 6/10, Total Reward: -0.57
Episode 7/10, Total Reward: -4.18
Episode 8/10, Total Reward: 0.96
Episode 9/10, Total Reward: -0.26
Episode 10/10, Total Reward: -1.15
Training completed.
Test Data saved with recommendations
		Correct Predictions  Wrong Predictions
Buy Long		0			1
Sell Short		5			1848
Hold		0			265


In [5]:
#RNN-RL
import numpy as np
import pandas as pd
import tensorflow as tf
from collections import deque
import random

# Hyperparameters
episodes = 10
learning_rate = 0.001
batch_size = 64
memory_size = 10000


def load_data(file_path, url='https://raw.githubusercontent.com/abhinavarorags/CoolStuff/refs/heads/test/sample_data.csv'):
    # Load dataset
    try:
        data = pd.read_csv(file_path)
    except FileNotFoundError:
        # If file not found, download from URL
        data = pd.read_csv(url)
    # Splitting dataset into training and testing
    train_data = data.iloc[:20000]
    test_data = data.iloc[20000:]
    return train_data, test_data


# Load dataset
file_path = '/mnt/data/sample_data.csv'
train_data, test_data = load_data(file_path)

# Placeholder dataset parameters
state_space = train_data.shape[1] - 1  # Number of features in state (assuming last column is the action/reward)
action_space = 3  # Number of possible actions (Buy Long, Sell Short, Hold)


# Neural Network for RNN-based Reinforcement Learning
class RNNPolicyGradient(tf.keras.Model):
    def __init__(self, action_space):
        super(RNNPolicyGradient, self).__init__()
        self.lstm = tf.keras.layers.LSTM(24, return_sequences=False, return_state=False)
        self.fc1 = tf.keras.layers.Dense(24, activation='relu')
        self.out = tf.keras.layers.Dense(action_space, activation='softmax')

    def call(self, x):
        x = self.lstm(x)
        x = self.fc1(x)
        return self.out(x)


# Experience Replay Memory
memory = deque(maxlen=memory_size)


def remember(state, action, reward):
    memory.append((state, action, reward))


def act(model, state):
    state = np.array([state])
    state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
    probs = model(state).numpy()[0]
    return np.random.choice(action_space, p=probs)


def train_model(model, optimizer):
    for e in range(episodes):
        sample = train_data.sample()  # Randomly sample a row from training data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        episode_memory = []
        total_reward = 0
        done = False
        while not done:
            action = act(model, state)
            next_sample = train_data.sample()  # Get next state from training data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.1  # Small penalty to discourage holding if unnecessary
            episode_memory.append((state, action, reward))
            state = next_state
            total_reward += reward
            done = np.random.rand() > 0.95  # Randomly ending the episode

        # Update policy
        with tf.GradientTape() as tape:
            loss = 0
            for state, action, reward in episode_memory:
                state = np.array([state])
                state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
                probs = model(state)
                action_prob = probs[0, action]
                loss -= tf.math.log(action_prob) * reward  # Negative log likelihood weighted by reward
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward:.2f}")
    print("Training completed.")


def test_model(model, test_episodes=100):
    correct_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}
    wrong_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}

    def test_act(model, state):
        state = np.array([state])
        state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
        probs = model(state).numpy()[0]
        return np.argmax(probs)

    for e in range(test_episodes):
        sample = test_data.sample()  # Randomly sample a row from testing data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        done = False
        while not done:
            action = test_act(model, state)
            next_sample = test_data.sample()  # Get next state from testing data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.1  # Small penalty to discourage holding if unnecessary
            done = np.random.rand() > 0.95  # Randomly ending the episode
            action_name = ["Buy Long", "Sell Short", "Hold"][action]
            if reward > 0.7:  # Assuming a reward above 0.7 is a correct prediction
                correct_predictions[action_name] += 1
            else:
                wrong_predictions[action_name] += 1
            state = next_state

    print("Test Data saved with recommendations")
    print("\t\tCorrect Predictions  Wrong Predictions")
    for action in ["Buy Long", "Sell Short", "Hold"]:
        print(f"{action}\t\t{correct_predictions[action]}\t\t\t{wrong_predictions[action]}")


# Load, Train and Test Model
model = RNNPolicyGradient(action_space)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

train_model(model, optimizer)
test_model(model)


Episode 1/10, Total Reward: 0.38
Episode 2/10, Total Reward: -2.31
Episode 3/10, Total Reward: -0.32
Episode 4/10, Total Reward: 0.16
Episode 5/10, Total Reward: -2.05
Episode 6/10, Total Reward: -0.20
Episode 7/10, Total Reward: -0.93
Episode 8/10, Total Reward: -0.44
Episode 9/10, Total Reward: -0.23
Episode 10/10, Total Reward: -0.43
Training completed.
Test Data saved with recommendations
		Correct Predictions  Wrong Predictions
Buy Long		0			1
Sell Short		0			1721
Hold		0			0


In [1]:
#DRQN
import numpy as np
import pandas as pd
import tensorflow as tf
from collections import deque
import random

# Hyperparameters
episodes = 10
learning_rate = 0.001
batch_size = 64
memory_size = 10000
gamma = 0.99


def load_data(file_path, url='https://raw.githubusercontent.com/abhinavarorags/CoolStuff/refs/heads/test/sample_data.csv'):
    # Load dataset
    try:
        data = pd.read_csv(file_path)
    except FileNotFoundError:
        # If file not found, download from URL
        data = pd.read_csv(url)
    # Splitting dataset into training and testing
    train_data = data.iloc[:20000]
    test_data = data.iloc[20000:]
    return train_data, test_data


# Load dataset
file_path = '/mnt/data/sample_data.csv'
train_data, test_data = load_data(file_path)

# Placeholder dataset parameters
state_space = train_data.shape[1] - 1  # Number of features in state (assuming last column is the action/reward)
action_space = 3  # Number of possible actions (Buy Long, Sell Short, Hold)


# Neural Network for DRQN
class DRQN(tf.keras.Model):
    def __init__(self, action_space):
        super(DRQN, self).__init__()
        self.lstm = tf.keras.layers.LSTM(24, return_sequences=False, return_state=False)
        self.fc1 = tf.keras.layers.Dense(24, activation='relu')
        self.out = tf.keras.layers.Dense(action_space, activation='linear')

    def call(self, x):
        x = self.lstm(x)
        x = self.fc1(x)
        return self.out(x)


# Experience Replay Memory
memory = deque(maxlen=memory_size)


def remember(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))


def act(model, state, epsilon):
    if np.random.rand() <= epsilon:
        return random.randrange(action_space)
    state = np.array([state])
    state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
    q_values = model(state).numpy()[0]
    return np.argmax(q_values)


def replay(model, target_model, optimizer):
    if len(memory) < batch_size:
        return
    minibatch = random.sample(memory, batch_size)
    for state, action, reward, next_state, done in minibatch:
        target = reward
        if not done:
            next_state = np.array([next_state]).reshape((1, next_state.shape[-1], 1))
            target = reward + gamma * np.amax(target_model(next_state).numpy()[0])
        state = np.array([state]).reshape((1, state.shape[-1], 1))
        target_f = model(state).numpy()
        target_f[0][action] = target
        with tf.GradientTape() as tape:
            q_values = model(state)
            loss = tf.keras.losses.MSE(target_f, q_values)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))


def train_model(model, target_model, optimizer):
    epsilon = 1.0
    epsilon_min = 0.01
    epsilon_decay = 0.995
    for e in range(episodes):
        sample = train_data.sample()  # Randomly sample a row from training data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        total_reward = 0
        done = False
        while not done:
            action = act(model, state, epsilon)
            next_sample = train_data.sample()  # Get next state from training data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.1  # Small penalty to discourage holding if unnecessary
            done = np.random.rand() > 0.95  # Randomly ending the episode
            remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
        replay(model, target_model, optimizer)
        if epsilon > epsilon_min:
            epsilon *= epsilon_decay
        target_model.set_weights(model.get_weights())
        print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward:.2f}, Epsilon: {epsilon:.2f}")
    print("Training completed.")


def test_model(model, test_episodes=100):
    correct_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}
    wrong_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}

    def test_act(model, state):
        state = np.array([state])
        state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
        q_values = model(state).numpy()[0]
        return np.argmax(q_values)

    for e in range(test_episodes):
        sample = test_data.sample()  # Randomly sample a row from testing data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        done = False
        while not done:
            action = test_act(model, state)
            next_sample = test_data.sample()  # Get next state from testing data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.1  # Small penalty to discourage holding if unnecessary
            done = np.random.rand() > 0.95  # Randomly ending the episode
            action_name = ["Buy Long", "Sell Short", "Hold"][action]
            if reward > 0.7:  # Assuming a reward above 0.7 is a correct prediction
                correct_predictions[action_name] += 1
            else:
                wrong_predictions[action_name] += 1
            state = next_state

    print("Test Data saved with recommendations")
    print("\t\tCorrect Predictions  Wrong Predictions")
    for action in ["Buy Long", "Sell Short", "Hold"]:
        print(f"{action}\t\t{correct_predictions[action]}\t\t\t{wrong_predictions[action]}")


# Load, Train and Test Model
model = DRQN(action_space)
target_model = DRQN(action_space)

# Initialize the target model by building it with dummy input
_dummy_state = np.zeros((1, state_space), dtype=np.float32)
model(tf.convert_to_tensor(_dummy_state.reshape((1, state_space, 1))))
target_model(tf.convert_to_tensor(_dummy_state.reshape((1, state_space, 1))))

# Set target model weights
target_model.set_weights(model.get_weights())
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

train_model(model, target_model, optimizer)
test_model(model)


Episode 1/10, Total Reward: -0.88, Epsilon: 0.99
Episode 2/10, Total Reward: -0.18, Epsilon: 0.99
Episode 3/10, Total Reward: 0.20, Epsilon: 0.99
Episode 4/10, Total Reward: -0.04, Epsilon: 0.98
Episode 5/10, Total Reward: -0.76, Epsilon: 0.98
Episode 6/10, Total Reward: -1.18, Epsilon: 0.97
Episode 7/10, Total Reward: -1.08, Epsilon: 0.97
Episode 8/10, Total Reward: 0.14, Epsilon: 0.96
Episode 9/10, Total Reward: 0.17, Epsilon: 0.96
Episode 10/10, Total Reward: -0.32, Epsilon: 0.95
Training completed.
Test Data saved with recommendations
		Correct Predictions  Wrong Predictions
Buy Long		0			432
Sell Short		0			1707
Hold		0			121


In [2]:
#DRQN - modified
#DRQN
import numpy as np
import pandas as pd
import tensorflow as tf
from collections import deque
import random

# Hyperparameters
episodes = 100
learning_rate = 0.001
batch_size = 64
memory_size = 10000
gamma = 0.99


def load_data(file_path, url='https://raw.githubusercontent.com/abhinavarorags/CoolStuff/refs/heads/test/sample_data.csv'):
    # Load dataset
    try:
        data = pd.read_csv(file_path)
    except FileNotFoundError:
        # If file not found, download from URL
        data = pd.read_csv(url)
    # Splitting dataset into training and testing
    train_data = data.iloc[:20000]
    test_data = data.iloc[20000:]
    return train_data, test_data


# Load dataset
file_path = '/mnt/data/sample_data.csv'
train_data, test_data = load_data(file_path)

# Placeholder dataset parameters
state_space = train_data.shape[1] - 1  # Number of features in state (assuming last column is the action/reward)
action_space = 3  # Number of possible actions (Buy Long, Sell Short, Hold)


# Neural Network for DRQN
class DRQN(tf.keras.Model):
    def __init__(self, action_space):
        super(DRQN, self).__init__()
        self.lstm = tf.keras.layers.LSTM(24, return_sequences=False, return_state=False)
        self.fc1 = tf.keras.layers.Dense(24, activation='relu')
        self.out = tf.keras.layers.Dense(action_space, activation='linear')

    def call(self, x):
        x = self.lstm(x)
        x = self.fc1(x)
        return self.out(x)


# Experience Replay Memory
memory = deque(maxlen=memory_size)


def remember(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))


def act(model, state, epsilon):
    if np.random.rand() <= epsilon:
        return random.randrange(action_space)
    state = np.array([state])
    state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
    q_values = model(state).numpy()[0]
    return np.argmax(q_values)


def replay(model, target_model, optimizer):
    if len(memory) < batch_size:
        return
    minibatch = random.sample(memory, batch_size)
    for state, action, reward, next_state, done in minibatch:
        target = reward
        if not done:
            next_state = np.array([next_state]).reshape((1, next_state.shape[-1], 1))
            target = reward + gamma * np.amax(target_model(next_state).numpy()[0])
        state = np.array([state]).reshape((1, state.shape[-1], 1))
        target_f = model(state).numpy()
        target_f[0][action] = target
        with tf.GradientTape() as tape:
            q_values = model(state)
            loss = tf.keras.losses.MSE(target_f, q_values)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))


def train_model(model, target_model, optimizer):
    epsilon = 0.5
    epsilon_min = 0.01
    epsilon_decay = 0.98
    for e in range(episodes):
        sample = train_data.sample()  # Randomly sample a row from training data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        total_reward = 0
        done = False
        while not done:
            action = act(model, state, epsilon)
            next_sample = train_data.sample()  # Get next state from training data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.5  # Increased penalty to discourage holding if unnecessary
            done = np.random.rand() > 0.9  # Increase chance of ending episode to encourage more diverse training
            remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
        replay(model, target_model, optimizer)
        if epsilon > epsilon_min:
            epsilon *= epsilon_decay
        target_model.set_weights(model.get_weights())
        print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward:.2f}, Epsilon: {epsilon:.2f}")
    print("Training completed.")


def test_model(model, test_episodes=100):
    correct_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}
    wrong_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}

    def test_act(model, state):
        state = np.array([state])
        state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
        q_values = model(state).numpy()[0]
        return np.argmax(q_values)

    for e in range(test_episodes):
        sample = test_data.sample()  # Randomly sample a row from testing data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        done = False
        while not done:
            action = test_act(model, state)
            next_sample = test_data.sample()  # Get next state from testing data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.1  # Small penalty to discourage holding if unnecessary
            done = np.random.rand() > 0.95  # Randomly ending the episode
            action_name = ["Buy Long", "Sell Short", "Hold"][action]
            if reward > 0.7:  # Assuming a reward above 0.7 is a correct prediction
                correct_predictions[action_name] += 1
            else:
                wrong_predictions[action_name] += 1
            state = next_state

    print("Test Data saved with recommendations")
    print("\t\tCorrect Predictions  Wrong Predictions")
    for action in ["Buy Long", "Sell Short", "Hold"]:
        print(f"{action}\t\t{correct_predictions[action]}\t\t\t{wrong_predictions[action]}")


# Load, Train and Test Model
model = DRQN(action_space)
target_model = DRQN(action_space)

# Initialize the target model by building it with dummy input
_dummy_state = np.zeros((1, state_space), dtype=np.float32)
model(tf.convert_to_tensor(_dummy_state.reshape((1, state_space, 1))))
target_model(tf.convert_to_tensor(_dummy_state.reshape((1, state_space, 1))))

# Set target model weights
target_model.set_weights(model.get_weights())
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

train_model(model, target_model, optimizer)
test_model(model)


Episode 1/10, Total Reward: -1.02, Epsilon: 0.49
Episode 2/10, Total Reward: -2.05, Epsilon: 0.48
Episode 3/10, Total Reward: -3.53, Epsilon: 0.47
Episode 4/10, Total Reward: -1.18, Epsilon: 0.46
Episode 5/10, Total Reward: -19.66, Epsilon: 0.45
Episode 6/10, Total Reward: -0.72, Epsilon: 0.44
Episode 7/10, Total Reward: -0.53, Epsilon: 0.43
Episode 8/10, Total Reward: -0.55, Epsilon: 0.43
Episode 9/10, Total Reward: -0.95, Epsilon: 0.42
Episode 10/10, Total Reward: 0.19, Epsilon: 0.41
Training completed.
Test Data saved with recommendations
		Correct Predictions  Wrong Predictions
Buy Long		0			73
Sell Short		0			1859
Hold		0			0


In [1]:
#DRQN
import numpy as np
import pandas as pd
import tensorflow as tf
from collections import deque
import random

# Hyperparameters
episodes = 100
learning_rate = 0.001
batch_size = 64
memory_size = 10000
gamma = 0.99


def load_data(file_path, url='https://raw.githubusercontent.com/abhinavarorags/CoolStuff/refs/heads/test/sample_data.csv'):
    # Load dataset
    try:
        data = pd.read_csv(file_path)
    except FileNotFoundError:
        # If file not found, download from URL
        data = pd.read_csv(url)
    # Splitting dataset into training and testing
    train_data = data.iloc[:40000]
    test_data = data.iloc[40000:]
    return train_data, test_data


# Load dataset
file_path = '/mnt/data/sample_data.csv'
train_data, test_data = load_data(file_path)

# Placeholder dataset parameters
state_space = train_data.shape[1] - 1  # Number of features in state (assuming last column is the action/reward)
action_space = 3  # Number of possible actions (Buy Long, Sell Short, Hold)


# Neural Network for DRQN
class DRQN(tf.keras.Model):
    def __init__(self, action_space):
        super(DRQN, self).__init__()
        self.lstm = tf.keras.layers.LSTM(24, return_sequences=False, return_state=False)
        self.fc1 = tf.keras.layers.Dense(24, activation='relu')
        self.out = tf.keras.layers.Dense(action_space, activation='linear')

    def call(self, x):
        x = self.lstm(x)
        x = self.fc1(x)
        return self.out(x)


# Experience Replay Memory
memory = deque(maxlen=memory_size)


def remember(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))


def act(model, state, epsilon):
    if np.random.rand() <= epsilon:
        return random.randrange(action_space)
    state = np.array([state])
    state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
    q_values = model(state).numpy()[0]
    return np.argmax(q_values)


def replay(model, target_model, optimizer):
    if len(memory) < batch_size:
        return
    minibatch = random.sample(memory, batch_size)
    for state, action, reward, next_state, done in minibatch:
        target = reward
        if not done:
            next_state = np.array([next_state]).reshape((1, next_state.shape[-1], 1))
            target = reward + gamma * np.amax(target_model(next_state).numpy()[0])
        state = np.array([state]).reshape((1, state.shape[-1], 1))
        target_f = model(state).numpy()
        target_f[0][action] = target
        with tf.GradientTape() as tape:
            q_values = model(state)
            loss = tf.keras.losses.MSE(target_f, q_values)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))


def train_model(model, target_model, optimizer):
    epsilon = 0.5
    epsilon_min = 0.01
    epsilon_decay = 0.98
    for e in range(episodes):
        sample = train_data.sample()  # Randomly sample a row from training data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        total_reward = 0
        done = False
        while not done:
            action = act(model, state, epsilon)
            next_sample = train_data.sample()  # Get next state from training data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.5  # Increased penalty to discourage holding if unnecessary
            done = np.random.rand() > 0.9  # Increase chance of ending episode to encourage more diverse training
            remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
        replay(model, target_model, optimizer)
        if epsilon > epsilon_min:
            epsilon *= epsilon_decay
        target_model.set_weights(model.get_weights())
        print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward:.2f}, Epsilon: {epsilon:.2f}")
    print("Training completed.")


def test_model(model, test_episodes=100):
    correct_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}
    wrong_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}

    def test_act(model, state):
        state = np.array([state])
        state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
        q_values = model(state).numpy()[0]
        return np.argmax(q_values)

    for e in range(test_episodes):
        sample = test_data.sample()  # Randomly sample a row from testing data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        done = False
        while not done:
            action = test_act(model, state)
            next_sample = test_data.sample()  # Get next state from testing data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.1  # Small penalty to discourage holding if unnecessary
            done = np.random.rand() > 0.95  # Randomly ending the episode
            action_name = ["Buy Long", "Sell Short", "Hold"][action]
            if reward > 0.7:  # Assuming a reward above 0.7 is a correct prediction
                correct_predictions[action_name] += 1
            else:
                wrong_predictions[action_name] += 1
            state = next_state

    print("Test Data saved with recommendations")
    print("\t\tCorrect Predictions  Wrong Predictions")
    for action in ["Buy Long", "Sell Short", "Hold"]:
        print(f"{action}\t\t{correct_predictions[action]}\t\t\t{wrong_predictions[action]}")


# Load, Train and Test Model
model = DRQN(action_space)
target_model = DRQN(action_space)

# Initialize the target model by building it with dummy input
_dummy_state = np.zeros((1, state_space), dtype=np.float32)
model(tf.convert_to_tensor(_dummy_state.reshape((1, state_space, 1))))
target_model(tf.convert_to_tensor(_dummy_state.reshape((1, state_space, 1))))

# Set target model weights
target_model.set_weights(model.get_weights())
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

train_model(model, target_model, optimizer)
test_model(model)


Episode 1/100, Total Reward: -0.33, Epsilon: 0.49
Episode 2/100, Total Reward: -0.39, Epsilon: 0.48
Episode 3/100, Total Reward: -1.13, Epsilon: 0.47
Episode 4/100, Total Reward: -0.03, Epsilon: 0.46
Episode 5/100, Total Reward: -1.97, Epsilon: 0.45
Episode 6/100, Total Reward: -0.09, Epsilon: 0.44
Episode 7/100, Total Reward: -0.10, Epsilon: 0.43
Episode 8/100, Total Reward: -2.51, Epsilon: 0.43
Episode 9/100, Total Reward: -0.51, Epsilon: 0.42
Episode 10/100, Total Reward: -0.73, Epsilon: 0.41
Episode 11/100, Total Reward: 0.23, Epsilon: 0.40
Episode 12/100, Total Reward: -1.50, Epsilon: 0.39
Episode 13/100, Total Reward: -0.01, Epsilon: 0.38
Episode 14/100, Total Reward: -0.50, Epsilon: 0.38
Episode 15/100, Total Reward: 0.07, Epsilon: 0.37
Episode 16/100, Total Reward: 0.23, Epsilon: 0.36
Episode 17/100, Total Reward: 0.65, Epsilon: 0.35
Episode 18/100, Total Reward: -1.49, Epsilon: 0.35
Episode 19/100, Total Reward: -2.14, Epsilon: 0.34
Episode 20/100, Total Reward: 0.19, Epsilon:

In [6]:
import numpy as np
import pandas as pd
import tensorflow as tf
from collections import deque
import random
from sklearn.preprocessing import StandardScaler

# Hyperparameters
episodes = 100
learning_rate = 0.001
batch_size = 64
memory_size = 10000
gamma = 0.99
window_size = 5  # Number of past timesteps to include in state representation


# Load and preprocess data
def load_data(file_path, url='https://raw.githubusercontent.com/abhinavarorags/CoolStuff/refs/heads/test/sample_data.csv'):
    # Load dataset
    try:
        data = pd.read_csv(file_path)
    except FileNotFoundError:
        # If file not found, download from URL
        data = pd.read_csv(url)

    # Convert datetime columns to numeric or drop them
    for col in data.columns:
        if pd.api.types.is_datetime64_any_dtype(data[col]) or data[col].dtype == 'object':
            try:
                data[col] = pd.to_datetime(data[col]).astype(int) / 10**9  # Convert datetime to timestamp
            except ValueError:
                data.drop(columns=[col], inplace=True)

    # Splitting dataset into training and testing
    train_data = data.iloc[:40000]
    test_data = data.iloc[40000:]

    # Scale features
    scaler = StandardScaler()
    train_data.iloc[:, :-1] = scaler.fit_transform(train_data.iloc[:, :-1].astype(float))
    test_data.iloc[:, :-1] = scaler.transform(test_data.iloc[:, :-1].astype(float))

    return train_data, test_data


# Load dataset
file_path = '/mnt/data/sample_data.csv'
train_data, test_data = load_data(file_path)

# Placeholder dataset parameters
state_space = train_data.shape[1] - 1  # Number of features in state (assuming last column is the action/reward)
action_space = 3  # Number of possible actions (Buy Long, Sell Short, Hold)


# Neural Network for DRQN
class DRQN(tf.keras.Model):
    def __init__(self, action_space):
        super(DRQN, self).__init__()
        self.lstm = tf.keras.layers.LSTM(64, return_sequences=False, return_state=False)
        self.fc1 = tf.keras.layers.Dense(64, activation='relu')
        self.fc2 = tf.keras.layers.Dense(32, activation='relu')
        self.out = tf.keras.layers.Dense(action_space, activation='linear')

    def call(self, x):
        x = tf.reshape(x, (x.shape[0], x.shape[1], x.shape[3]))  # Adjust input shape for LSTM layer
        x = self.lstm(x)
        x = self.fc1(x)
        x = self.fc2(x)
        return self.out(x)


# Experience Replay Memory
memory = deque(maxlen=memory_size)


def remember(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))


def act(model, state, epsilon):
    if np.random.rand() <= epsilon:
        return random.randrange(action_space)
    state = np.array([state])
    q_values = model(state).numpy()[0]
    return np.argmax(q_values)


def replay(model, target_model, optimizer):
    if len(memory) < batch_size:
        return
    minibatch = random.sample(memory, batch_size)
    for state, action, reward, next_state, done in minibatch:
        target = reward
        if not done:
            target = reward + gamma * np.amax(target_model(np.array([next_state])).numpy()[0])
        target_f = model(np.array([state])).numpy()
        target_f[0][action] = target
        with tf.GradientTape() as tape:
            q_values = model(np.array([state]))
            loss = tf.keras.losses.MSE(target_f, q_values)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))


def get_state_sequence(data, idx, window_size=5):
    start_idx = max(0, idx - window_size + 1)
    state_sequence = data.iloc[start_idx:idx + 1].select_dtypes(include=[np.number]).values
    if state_sequence.shape[0] < window_size:
        # Padding if the sequence is shorter than the window size
        padding = np.zeros((window_size - state_sequence.shape[0], state_sequence.shape[1]))
        state_sequence = np.vstack((padding, state_sequence))
    return state_sequence.reshape((1, window_size, 1, state_sequence.shape[1]))


def train_model(model, target_model, optimizer):
    epsilon = 0.5
    epsilon_min = 0.1
    epsilon_decay = 0.995
    for e in range(episodes):
        idx = random.randint(0, len(train_data) - 1)
        state = get_state_sequence(train_data, idx, window_size=window_size)
        total_reward = 0
        done = False
        while not done:
            action = act(model, state, epsilon)
            next_idx = random.randint(0, len(train_data) - 1)
            next_state = get_state_sequence(train_data, next_idx, window_size=window_size)
            if action == 0:  # Buy Long
                reward = float(train_data.iloc[next_idx, -1]) - float(train_data.iloc[idx, -1])
            elif action == 1:  # Sell Short
                reward = float(train_data.iloc[idx, -1]) - float(train_data.iloc[next_idx, -1])
            else:  # Hold
                reward = -0.5
            done = np.random.rand() > 0.9
            remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
        replay(model, target_model, optimizer)
        if epsilon > epsilon_min:
            epsilon *= epsilon_decay
        target_model.set_weights(model.get_weights())
        print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward:.2f}, Epsilon: {epsilon:.2f}")
    print("Training completed.")


# Load, Train and Test Model
model = DRQN(action_space)
target_model = DRQN(action_space)

# Initialize the target model by building it with dummy input
_dummy_state = np.zeros((1, window_size, state_space), dtype=np.float32)
model(tf.convert_to_tensor(_dummy_state))
target_model(tf.convert_to_tensor(_dummy_state))

# Set target model weights
target_model.set_weights(model.get_weights())
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

train_model(model, target_model, optimizer)


 -0.79234806]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  train_data.iloc[:, :-1] = scaler.fit_transform(train_data.iloc[:, :-1].astype(float))
  train_data.iloc[:, :-1] = scaler.fit_transform(train_data.iloc[:, :-1].astype(float))
 -0.20343584]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  test_data.iloc[:, :-1] = scaler.transform(test_data.iloc[:, :-1].astype(float))
  test_data.iloc[:, :-1] = scaler.transform(test_data.iloc[:, :-1].astype(float))


IndexError: Exception encountered when calling DRQN.call().

[1mtuple index out of range[0m

Arguments received by DRQN.call():
  • x=tf.Tensor(shape=(1, 5, 12), dtype=float32)

In [9]:
import numpy as np
import pandas as pd
import tensorflow as tf
from collections import deque
import random
from sklearn.preprocessing import StandardScaler

# Hyperparameters
episodes = 100
learning_rate = 0.001
batch_size = 64
memory_size = 10000
gamma = 0.99
window_size = 5  # Number of past timesteps to include in state representation


# Load and preprocess data
def load_data(file_path, url='https://raw.githubusercontent.com/abhinavarorags/CoolStuff/refs/heads/test/sample_data.csv'):
    # Load dataset
    try:
        data = pd.read_csv(file_path)
    except FileNotFoundError:
        # If file not found, download from URL
        data = pd.read_csv(url)

    # Convert datetime columns to numeric or drop them
    for col in data.columns:
        if pd.api.types.is_datetime64_any_dtype(data[col]) or data[col].dtype == 'object':
            try:
                data[col] = pd.to_datetime(data[col]).astype(int) / 10**9  # Convert datetime to timestamp
            except ValueError:
                data.drop(columns=[col], inplace=True)

    # Splitting dataset into training and testing
    train_data = data.iloc[:40000]
    test_data = data.iloc[40000:]

    # Scale features
    scaler = StandardScaler()
    train_data.iloc[:, :-1] = scaler.fit_transform(train_data.iloc[:, :-1].astype(float)).astype(np.float32)
    test_data.iloc[:, :-1] = scaler.transform(test_data.iloc[:, :-1].astype(float)).astype(np.float32)

    return train_data, test_data


# Load dataset
file_path = '/mnt/data/sample_data.csv'
train_data, test_data = load_data(file_path)

# Placeholder dataset parameters
state_space = train_data.shape[1] - 1  # Number of features in state (assuming last column is the action/reward)
action_space = 3  # Number of possible actions (Buy Long, Sell Short, Hold)


# Neural Network for DRQN
class DRQN(tf.keras.Model):
    def __init__(self, action_space):
        super(DRQN, self).__init__()
        self.lstm = tf.keras.layers.LSTM(64, return_sequences=False, return_state=False)
        self.fc1 = tf.keras.layers.Dense(64, activation='relu')
        self.fc2 = tf.keras.layers.Dense(32, activation='relu')
        self.out = tf.keras.layers.Dense(action_space, activation='linear')

    def call(self, x):
        x = tf.reshape(x, (x.shape[0], x.shape[1], -1))  # Adjust input shape for LSTM layer
        x = self.lstm(x)
        x = self.fc1(x)
        x = self.fc2(x)
        return self.out(x)


# Experience Replay Memory
memory = deque(maxlen=memory_size)


def remember(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))


def act(model, state, epsilon):
    if np.random.rand() <= epsilon:
        return random.randrange(action_space)
    state = np.array([state])
    q_values = model(state).numpy()[0]
    return np.argmax(q_values)


def replay(model, target_model, optimizer):
    if len(memory) < batch_size:
        return
    minibatch = random.sample(memory, batch_size)
    for state, action, reward, next_state, done in minibatch:
        target = reward
        if not done:
            target = reward + gamma * np.amax(target_model(np.array([next_state])).numpy()[0])
        target_f = model(np.array([state])).numpy()
        target_f[0][action] = target
        with tf.GradientTape() as tape:
            q_values = model(np.array([state]))
            loss = tf.keras.losses.MSE(target_f, q_values)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))


def get_state_sequence(data, idx, window_size=5):
    start_idx = max(0, idx - window_size + 1)
    state_sequence = data.iloc[start_idx:idx + 1].select_dtypes(include=[np.number]).values
    if state_sequence.shape[0] < window_size:
        # Padding if the sequence is shorter than the window size
        padding = np.zeros((window_size - state_sequence.shape[0], state_sequence.shape[1]))
        state_sequence = np.vstack((padding, state_sequence))
    return state_sequence.reshape((1, window_size, state_sequence.shape[1]))


def train_model(model, target_model, optimizer):
    epsilon = 0.5
    epsilon_min = 0.1
    epsilon_decay = 0.995
    for e in range(episodes):
        idx = random.randint(0, len(train_data) - 1)
        state = get_state_sequence(train_data, idx, window_size=window_size)
        total_reward = 0
        done = False
        while not done:
            action = act(model, state, epsilon)
            next_idx = random.randint(0, len(train_data) - 1)
            next_state = get_state_sequence(train_data, next_idx, window_size=window_size)
            if action == 0:  # Buy Long
                reward = float(train_data.iloc[next_idx, -1]) - float(train_data.iloc[idx, -1])
            elif action == 1:  # Sell Short
                reward = float(train_data.iloc[idx, -1]) - float(train_data.iloc[next_idx, -1])
            else:  # Hold
                reward = -0.5
            done = np.random.rand() > 0.9
            remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
        replay(model, target_model, optimizer)
        if epsilon > epsilon_min:
            epsilon *= epsilon_decay
        target_model.set_weights(model.get_weights())
        print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward:.2f}, Epsilon: {epsilon:.2f}")
    print("Training completed.")


# Load, Train and Test Model
model = DRQN(action_space)
target_model = DRQN(action_space)

# Initialize the target model by building it with dummy input
_dummy_state = np.zeros((1, window_size, state_space), dtype=np.float32)
model(tf.convert_to_tensor(_dummy_state))
target_model(tf.convert_to_tensor(_dummy_state))

# Set target model weights
target_model.set_weights(model.get_weights())
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

train_model(model, target_model, optimizer)


 -0.7923481 ]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  train_data.iloc[:, :-1] = scaler.fit_transform(train_data.iloc[:, :-1].astype(float)).astype(np.float32)
  train_data.iloc[:, :-1] = scaler.fit_transform(train_data.iloc[:, :-1].astype(float)).astype(np.float32)
 -0.20343584]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  test_data.iloc[:, :-1] = scaler.transform(test_data.iloc[:, :-1].astype(float)).astype(np.float32)
  test_data.iloc[:, :-1] = scaler.transform(test_data.iloc[:, :-1].astype(float)).astype(np.float32)


Episode 1/100, Total Reward: 55373400.00, Epsilon: 0.50
Episode 2/100, Total Reward: 70752599.50, Epsilon: 0.50
Episode 3/100, Total Reward: -5052002.50, Epsilon: 0.49
Episode 4/100, Total Reward: 15277499.50, Epsilon: 0.49
Episode 5/100, Total Reward: 140476800.00, Epsilon: 0.49
Episode 6/100, Total Reward: 4061400.00, Epsilon: 0.49
Episode 7/100, Total Reward: 38908498.00, Epsilon: 0.48
Episode 8/100, Total Reward: -145467304.50, Epsilon: 0.48
Episode 9/100, Total Reward: -89576102.00, Epsilon: 0.48
Episode 10/100, Total Reward: -34377902.00, Epsilon: 0.48
Episode 11/100, Total Reward: -62966400.50, Epsilon: 0.47
Episode 12/100, Total Reward: 233127898.50, Epsilon: 0.47
Episode 13/100, Total Reward: 102594899.50, Epsilon: 0.47
Episode 14/100, Total Reward: 13826699.50, Epsilon: 0.47
Episode 15/100, Total Reward: 275037597.50, Epsilon: 0.46
Episode 16/100, Total Reward: -11730900.00, Epsilon: 0.46
Episode 17/100, Total Reward: -21683100.50, Epsilon: 0.46
Episode 18/100, Total Reward: 

In [10]:
def test_model(model, test_episodes=100):
    correct_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}
    wrong_predictions = {"Buy Long": 0, "Sell Short": 0, "Hold": 0}

    def test_act(model, state):
        state = np.array([state])
        state = state.reshape((1, state.shape[-1], 1))  # Reshape for LSTM input
        q_values = model(state).numpy()[0]
        return np.argmax(q_values)

    for e in range(test_episodes):
        sample = test_data.sample()  # Randomly sample a row from testing data
        state = sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()  # Use only numeric features as state
        done = False
        while not done:
            action = test_act(model, state)
            next_sample = test_data.sample()  # Get next state from testing data
            next_state = next_sample.select_dtypes(include=[np.number]).iloc[:, :state_space].values.flatten()
            if action == 0:  # Buy Long
                reward = float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes up
            elif action == 1:  # Sell Short
                reward = float(sample.select_dtypes(include=[np.number]).iloc[0, -1]) - float(next_sample.select_dtypes(include=[np.number]).iloc[0, -1])  # Positive reward if price goes down
            else:  # Hold
                reward = -0.1  # Small penalty to discourage holding if unnecessary
            done = np.random.rand() > 0.95  # Randomly ending the episode
            action_name = ["Buy Long", "Sell Short", "Hold"][action]
            if reward > 0.7:  # Assuming a reward above 0.7 is a correct prediction
                correct_predictions[action_name] += 1
            else:
                wrong_predictions[action_name] += 1
            state = next_state

    print("Test Data saved with recommendations")
    print("\t\tCorrect Predictions  Wrong Predictions")
    for action in ["Buy Long", "Sell Short", "Hold"]:
        print(f"{action}\t\t{correct_predictions[action]}\t\t\t{wrong_predictions[action]}")

In [11]:
test_model(model)

Test Data saved with recommendations
		Correct Predictions  Wrong Predictions
Buy Long		895			831
Sell Short		0			0
Hold		0			0
