In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.activations import relu, linear
from keras.layers import Dense, Dropout, Conv1D, MaxPooling2D, Activation, Flatten, Embedding, Reshape,MaxPooling1D,LeakyReLU
!pip install yfinance
import yfinance as yf

In [None]:
Crypto_name = ["BTC-USD"] # replace with other crypto currency e.g. "ETH-USD" 'XRP-USD' "LTC-USD"
start_date="2022-06-20"
end_date='2023-06-20'
prices=pd.DataFrame()
for i in Crypto_name:
    data= yf.Ticker(i)
    data = data.history(start=start_date , end=end_date,interval="1h")
    colse=pd.DataFrame(data.Close)
    prices[i] = colse

# Plot the closing changes 

In [None]:
# plot the closing price changes in the given period
plt.xlabel("date")
plt.ylabel("closing price")
plt.title(f"bitcoin closing prices from{start_date} to {end_date}")
plt.plot(prices['BTC-USD'])


# Generate the action space

In [None]:
import gym
from gym import spaces
action_choices = np.linspace(-20, 20, num=51) # using linespace to generate 25 actions to buy or sell in [0.5$,20$] interval
print(action_choices)
plt.xlabel("action id")
plt.ylabel("action value")
plt.title(f"generated discrete action space")
plt.scatter([act for act in range(len(action_choices))],action_choices)

# Define the DQN agent (double DQN)

In [None]:
class DQNAgent :
    def __init__(self, state_size, action_size,batch_size,update_target_interval=100):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = ReplayBuffer(1000000,state_size,action_size)
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_min =0.1
        self.epsilon_decay = 0.995
        self.batch_size = batch_size
        self.learning_rate = 0.001
        
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.target_model.set_weights(self.model.get_weights())
        
        self.update_target_interval =update_target_interval
        self.update_target_counter=0
        
    def _build_model(self):
        model = Sequential()
        model.add(Conv1D(128,8, input_shape=(self.state_size,1), padding='same'))
        model.add(LeakyReLU())
        model.add(MaxPooling1D(2, padding='same'))
        model.add(Conv1D(64,8, padding='same'))
        model.add(LeakyReLU())
        model.add(Flatten())
        model.add(Dense(384))
        model.add(Activation('relu'))
        model.add(Dense(256))
        model.add(Activation('relu'))
        model.add(Dense(self.action_size, activation='linear')) 
        model.compile(loss="mse", optimizer=Adam(lr=self.learning_rate), metrics=['accuracy'])
        return model

    def remember(self, state, action, reward, new_state, done):
        self.memory.store_transition(state, action, reward, new_state, done)
        
    def act(self, state,test_mode=False):
        if not test_mode:
            if np.random.rand() <= self.epsilon :
                return random.randrange(self.action_size)
        act_values = self.model.predict(np.expand_dims(state,axis=0), verbose=0)
        return np.argmax(act_values[0])
    
    def train(self, batch_size) :
        if self.memory.mem_cntr  < batch_size:
            return
        state, action, reward, new_state, done = self.memory.sample_buffer(batch_size)
        
        qState=self.model.predict(state,verbose=0)
        qNextState=self.model.predict(new_state,verbose=0)
        qNextStateTarget=self.target_model.predict(new_state,verbose=0)
        maxActions=np.argmax(qNextState,axis=1)
        batchIndex = np.arange(batch_size, dtype=np.int32)
        qState[batchIndex,action]=(reward+(self.gamma*qNextStateTarget[batchIndex,maxActions.astype(int)]*(1-done)))
        _=self.model.fit(x=state,y=qState,verbose=0,epochs=65)

        self.update_target_counter+=1
        if self.update_target_counter % self.update_target_interval==0 :
            self.target_model.set_weights(self.model.get_weights())
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

In [None]:
class ReplayBuffer():
    def __init__(self, max_size, input_shape, n_actions):
        self.mem_size = max_size
        self.mem_cntr = 0
        self.state_memory = np.zeros((self.mem_size, input_shape))
        self.new_state_memory = np.zeros((self.mem_size, input_shape))
        self.action_memory = np.zeros(self.mem_size,dtype=int)
        self.reward_memory = np.zeros(self.mem_size)
        self.terminal_memory = np.zeros(self.mem_size)

    def store_transition(self, state, action, reward, state_, done):
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.new_state_memory[index] = state_
        self.terminal_memory[index] = done

        self.mem_cntr += 1

    def sample_buffer(self, batch_size):
        max_mem = min(self.mem_cntr, self.mem_size)

        batch = np.random.choice(max_mem, batch_size)

        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.new_state_memory[batch]
        dones = self.terminal_memory[batch]

        return states, actions, rewards, states_, dones

In [None]:
class TradingEnv(gym.Env) :
    def __init__(self,action_choices, init_capital=1000, stock_price_history=[], window_size=30):
        self.init_capital = init_capital #amount of money we have at the initial step
        self.stock = 0 # initial amount of stock we have (eg. 0 Bitcoin at start)
        self.stock_price_history = stock_price_history # the full series of stock or currency values 
        self.window_size = window_size # amount of data we look at to predict the next price
        self.current_step = 0 # the inital location to start
        self.action_space = spaces.Discrete(len(action_choices))
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(self.window_size,))
        self.reset()
        
    def reset(self) :
        self.current_step = 0
        self.stock = 0
        self.capital = self.init_capital #initial current capitaql to initial_capital
        return self._next_observation() # return the first observation
    
    def _next_observation(self):
        prices = self.stock_price_history[self.current_step:self.current_step+self.window_size] 
        #return the price seris according to current place and up to window_size eg. [23 to 23 + 30]
        return np.array(prices)
    
    def step(self, action):
        stock_price = self.stock_price_history[self.current_step+self.window_size]
        portfolio_value = (self.capital + self.stock * stock_price) # total portfolio value including cash and stocks
        if action > 0 and action <= self.capital :
            self.capital -= action
            self.stock += action/stock_price
        elif action < 0 and (self.stock * stock_price)>(-action):
            self.stock += action/stock_price                        
            self.capital -= action
        new_portfolie_value = self.capital + self.stock * self.stock_price_history[self.current_step+self.window_size+1]
        reward = new_portfolie_value - portfolio_value  # reward = protfolio value after commiting action - portfolio before commiting action
        self.current_step += 1
        done = self.current_step+self.window_size + 2 >= len(self.stock_price_history) # if we will reach the end of price series in next step of the environment
        return self._next_observation(), reward, done, new_portfolie_value # new_portfolie_value is returned due to track agent progress and its optional to log progress only

In [None]:
# Split data into training and testing sets
closing_price=prices["BTC-USD"]
split_index = int(0.8 * len(closing_price))
train_prices = closing_price[:split_index]
test_prices = closing_price[split_index:]
#train_prices,test_prices

In [None]:
# Initialize the trading environment and DQN agent
train_env= TradingEnv(stock_price_history=train_prices,action_choices=action_choices)
test_env = TradingEnv(stock_price_history=test_prices,action_choices=action_choices)
state_size = train_env.observation_space.shape[0]
action_size = train_env.action_space.n
agent = DQNAgent(state_size, action_size,batch_size=50,update_target_interval=100)

In [None]:
#main loop
agent_value = []
for e in range(10):
    state = train_env.reset()
    done = False
    score = 0
    steps=0
    while not done:
        action = agent.act(state)
        next_state, reward, done ,value = train_env.step(action_choices[action])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        score += reward
        agent_value.append(value)
        steps+=1
        if (steps%10)==0:
            print(f"step{steps} value os far:{value}   cap:{train_env.capital} st:{train_env.stock} eps:{agent.epsilon}")
            plt.plot(agent_value)
            plt.show()
        agent.train(50)
    print(f'Episode {e}, Score(total_reward): {score:.4f}')