In [1]:
import numpy as np
import pandas as pd

# Define the environment
class PortfolioEnvironment:
    def __init__(self, prices, initial_balance=10000, transaction_cost=0.001):
        self.prices = prices
        self.initial_balance = initial_balance
        self.transaction_cost = transaction_cost
        self.reset()

    def reset(self):
        self.balance = self.initial_balance
        self.num_shares = 0
        self.current_step = 0
        self.done = False
        return self._get_observation()

    def step(self, action):
        if self.done:
            raise Exception("Environment is done, please reset.")

        current_price = self.prices[self.current_step]
        reward = 0

        # Buy
        if action == 1 and self.balance > current_price:
            self.num_shares += self.balance / current_price
            self.balance -= self.num_shares * current_price * (1 + self.transaction_cost)

        # Sell
        elif action == 2 and self.num_shares > 0:
            self.balance += self.num_shares * current_price * (1 - self.transaction_cost)
            self.num_shares = 0

        self.current_step += 1
        if self.current_step >= len(self.prices) - 1:
            self.done = True
            final_value = self.balance + self.num_shares * self.prices[self.current_step]
            reward = final_value - self.initial_balance
        else:
            reward = self.balance + self.num_shares * self.prices[self.current_step] - self.initial_balance

        return self._get_observation(), reward, self.done

    def _get_observation(self):
        return np.array([self.balance, self.num_shares, self.prices[self.current_step]])

# Define Q-Learning agent
class QLearningAgent:
    def __init__(self, n_actions, learning_rate=0.1, discount_factor=0.95, exploration_rate=1.0, exploration_decay=0.99):
        self.n_actions = n_actions
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.exploration_decay = exploration_decay
        self.q_table = {}

    def get_q_value(self, state, action):
        return self.q_table.get((tuple(state), action), 0.0)

    def choose_action(self, state):
        if np.random.random() < self.exploration_rate:
            return np.random.randint(self.n_actions)
        else:
            q_values = [self.get_q_value(state, action) for action in range(self.n_actions)]
            return np.argmax(q_values)

    def learn(self, state, action, reward, next_state):
        current_q = self.get_q_value(state, action)
        max_next_q = max([self.get_q_value(next_state, next_action) for next_action in range(self.n_actions)])
        new_q = current_q + self.learning_rate * (reward + self.discount_factor * max_next_q - current_q)
        self.q_table[(tuple(state), action)] = new_q

    def update_exploration(self):
        self.exploration_rate *= self.exploration_decay

# Load historical price data (replace with your own data)
prices = np.random.rand(100) * 100  # Random price data

# Initialize environment and agent
env = PortfolioEnvironment(prices)
agent = QLearningAgent(n_actions=3)

# Training the agent
n_episodes = 1000
for episode in range(n_episodes):
    state = env.reset()
    while True:
        action = agent.choose_action(state)
        next_state, reward, done = env.step(action)
        agent.learn(state, action, reward, next_state)
        state = next_state
        if done:
            break
    agent.update_exploration()

# After training, you can evaluate the agent's performance
final_state = env.reset()
while not env.done:
    action = agent.choose_action(final_state)
    final_state, _, _ = env.step(action)

final_balance = final_state[0] + final_state[1] * prices[env.current_step]
print(f"Final portfolio value: ${final_balance:.2f}")


Final portfolio value: $19436315.27
