In [None]:
import pandas as pd
import numpy as np
import random
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
import yfinance as yf
import matplotlib.pyplot as plt
import gym

tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

# Define the environment
class StockMarketEnv(gym.Env):
    def __init__(self):
        super().__init__()
        self.start_date = '2020-01-01'
        self.end_date = '2022-01-01'
        self.ticker = 'AAPL'
        self.df = yf.download(self.ticker, start=self.start_date, end=self.end_date)
        self.df = self.df.dropna()
        self.df['Returns'] = self.df['Close'].pct_change()
        self.df = self.df.iloc[1:]
        self.observations = len(self.df)
        self.state_shape = 1
        self.action_shape = 1
        self.state = np.array([[self.df['Returns'][0]]])

    def step(self, action):
        last_period_close = self.df['Close'][self.current_step-1]
        today_close = self.df['Close'][self.current_step]
        predicted_close = today_close*(1+action[0])
        self.df.loc[self.df.index[self.current_step], 'predicted_close'] = predicted_close
        signal = 0
        if predicted_close > last_period_close:
            signal = 1
        elif predicted_close < last_period_close:
            signal = -1
        reward = signal * self.df['Returns'][self.current_step]

        self.current_step += 1
        if self.current_step >= self.observations:
            done = True
        else:
            done = False
            self.state = np.array([[self.df['Returns'][self.current_step]]])
        return self.state, reward, done, {}

    def reset(self):
        self.current_step = 0
        self.state = np.array([[self.df['Returns'][0]]])
        return self.state

    def render(self, mode='human'):
        pass

    def close(self):
        pass

# Define the agent
class DDPGAgent:
    def __init__(self, env):
        self.env = env
        self.gamma = 0.99
        self.tau = 0.001
        self.buffer_size = 100000
        self.batch_size = 6400
        self.replay_buffer = []
        self.q_model = models.Sequential([
            layers.Dense(64, activation='relu', input_shape=(env.state_shape + env.action_shape,)),
            layers.Dense(32, activation='relu'),
            layers.Dense(1)
        ])
        self.policy_model = models.Sequential([
            layers.Dense(64, activation='relu', input_shape=(env.state_shape,)),
            layers.Dense(32, activation='relu'),
            layers.Dense(env.action_shape, activation='tanh')
        ])
        self.q_optimizer = optimizers.Adam(learning_rate=0.001)
        self.policy_optimizer = optimizers.Adam(learning_rate=0.001)

    def ddpg(self, state, action, reward, next_state, done):
        # Add the transition to the replay buffer
        self.replay_buffer.append((state, action, reward, next_state, done))

        # Sample a batch of transitions from the replay buffer
        if len(self.replay_buffer) < self.batch_size:
            return
        batch = random.sample(self.replay_buffer, self.batch_size)
        states = np.array([transition[0] for transition in batch])
        actions = np.array([transition[1] for transition in batch])
        rewards = np.array([transition[2] for transition in batch])
        next_states = np.array([transition[3] for transition in batch])
        dones = np.array([transition[4] for transition in batch])

        # Update the Q-network
        with tf.GradientTape() as tape:
            target_actions = self.policy_model(next_states)
            target_q_values = self.q_model(tf.concat([next_states, target_actions], axis=1))
            target_y = rewards + self.gamma * (1 - dones) * target_q_values
            current_q_values = self.q_model(tf.concat([states, actions], axis=1))
            loss = tf.reduce_mean(tf.square(current_q_values - target_y))
        grads = tape.gradient(loss, self.q_model.trainable_weights)
        self.q_optimizer.apply_gradients(zip(grads, self.q_model.trainable_weights))

        # Update the policy network
        with tf.GradientTape() as tape:
            new_actions = self.policy_model(states)
            q_values = self.q_model(tf.concat([states, new_actions], axis=1))
            policy_loss = -tf.reduce_mean(q_values)
        grads = tape.gradient(policy_loss, self.policy_model.trainable_weights)
        self.policy_optimizer.apply_gradients(zip(grads, self.policy_model.trainable_weights))

        # Update the target networks
        q_weights = self.q_model.get_weights()
        target_q_weights = self.target_q_model.get_weights()
        for i in range(len(q_weights)):
            target_q_weights[i] = self.tau * q_weights[i] + (1 - self.tau) * target_q_weights[i]
        self.target_q_model.set_weights(target_q_weights)

        policy_weights = self.policy_model.get_weights()
        target_policy_weights = self.target_policy_model.get_weights()
        for i in range(len(policy_weights)):
            target_policy_weights[i] = self.tau * policy_weights[i] + (1 - self.tau) * target_policy_weights[i]
        self.target_policy_model.set_weights(target_policy_weights)

    def train(self, episodes):
        for episode in range(episodes):
            state = self.env.reset()
            episode_reward = 0
            done = False
            while not done:
                action = self.policy_model.predict(state)[0]
                action = np.clip(action, -1, 1)
                next_state, reward, done, _ = self.env.step(action)
                self.ddpg(state, action, reward, next_state, done)
                episode_reward += reward
                state = next_state
            print(f"Episode: {episode+1}, Reward: {episode_reward}")

        print("Training complete!")

    def test(self):
        state = self.env.reset()
        done = False
        while not done:
            action = self.policy_model.predict(state)[0]
            action = np.clip(action, -1, 1)
            state, _, done, _ = self.env.step(action)
        print("Testing complete!")

    def plot_results(self):
        self.env.render()
        plt.plot(self.env.df['Close'], label='Actual Close')
        plt.plot(self.env.df['predicted_close'], label='Predicted Close')
        plt.legend()
        plt.show()

    def calculate_metrics(self):
        returns = self.env.df['Returns'].values
        predicted_returns = self.env.df['predicted_close'].pct_change().values[1:]
        win_rate = (np.sign(returns[1:]) == np.sign(predicted_returns)).mean()
        maximum_drawdown = (1 - (self.env.df['Close'] / self.env.df['Close'].cummax())).max()
        net_profit = (self.env.df['predicted_close'].pct_change() + 1).cumprod()[-1]
        loss_rate = 1 - win_rate
        return win_rate, maximum_drawdown, net_profit, loss_rate

# Create the environment
env = StockMarketEnv()

# Create the agent
agent = DDPGAgent(env)

# Train the agent
episode_count = 100
agent.train(episode_count)

# Test the agent
agent.test()

# Plot the results
agent.plot_results()

# Calculate metrics
win_rate, maximum_drawdown, net_profit, loss_rate = agent.calculate_metrics()
print(f"Win Rate: {win_rate}")
print(f"Maximum Drawdown: {maximum_drawdown}")
print(f"Net Profit: {net_profit}")
print(f"Loss Rate: {loss_rate}")

[*********************100%***********************]  1 of 1 completed
