# Alpaca Paper Trading

## Imports

#### Base Imports

In [16]:
import warnings
warnings.filterwarnings('ignore')

import sys
import os
sys.path.append("../FinRL/")
sys.path.append("../")

import alpaca_trade_api as tradeapi
from alpaca_trade_api.rest import TimeFrame
import datetime
import time
import numpy as np
import pandas as pd
import threading
import pickle
from stable_baselines3 import A2C
import yfinance as yf
import itertools
from datetime import date, timedelta
import datetime
from stockstats import StockDataFrame as Sdf
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from copy import deepcopy
import empyrical
from contextlib import contextmanager
import sys, os

#### FinRL Imports

In [2]:
from finrl.meta.env_stock_trading.env_stocktrading import StockTradingEnv
from finrl.meta.preprocessor.preprocessors import FeatureEngineer, data_split
from finrl.meta.preprocessor.yahoodownloader import YahooDownloader
from finrl.agents.stablebaselines3.models import DRLAgent
from finrl.plot import get_daily_return

#### Configs

In [3]:
from finrl.config import INDICATORS, TRAINED_MODEL_DIR
from Config.alpacha_config import API_KEY, API_SECRET, API_BASE_URL
from Config.local_config import PCA_INDICATORS
from Config.local_config import DOW_30_TICKERS
import Config.model_kwargs_config as model_kwargs

## Regime Switching 

#### Other Functions

In [4]:
@contextmanager
def suppress_stdout():
    with open(os.devnull, "w") as devnull:
        old_stdout = sys.stdout
        sys.stdout = devnull
        try:  
            yield
        finally:
            sys.stdout = old_stdout

In [5]:
class model_parameters:
    def __init__(self, model_name, kwargs, steps, data, data_retention_level):
        self.model_name = model_name
        self.model = None
        self.sharpe = []
        self.kwargs = kwargs
        self.policy = "MlpPolicy"
        self.policy_kwargs = None
        self.verbose = 0
        self.total_timesteps = steps
        self.data = data
        self.data_retention_level = data_retention_level

#### Data Collection

In [17]:
class run_PCA():
    def __init__(self, indicators, data, tickers) -> None:
        self.data = data
        self.tickers = tickers
        self.indicators = indicators
        self.number_of_components = -1
    
    def run_PCA(self, n_components = .99):
        x = self.data.loc[:, self.indicators].values
        x = StandardScaler().fit_transform(x)
        pca = PCA(n_components=n_components)

        principalComponents = pca.fit_transform(x)
        principalDf = pd.DataFrame(data = principalComponents)

        pickle.dump(pca, open(f"PCA_models/PCA_Model_{str(n_components)}.pickle", 'wb'))

        ret = self.data[['date','tic','open','high','low','close','volume', 'day']]

        for count in range(0, len(principalDf.columns)):
            ret[count] = principalDf[count].values

        ret['vix'] = self.data['vix']
        ret['turbulence'] = self.data['turbulence']
        
        self.number_of_components = len(principalDf.columns)

        ret = ret.fillna(0)

        # ConfusionMatrixDisplay.from_estimator(
        #     principalComponents, display_labels = self.indicators, xticks_rotation = "vertical"
        # )

        return ret

class data_preparation:
    def __init__(self, start_date, end_date, tickers, indicators) -> None:
        self.start_date = start_date
        self.end_date = end_date
        self.tickers = tickers
        self.indicators = indicators

        self.data = YahooDownloader(start_date = self.start_date,
                                    end_date = self.end_date,
                                    ticker_list = tickers).fetch_data()
        self.data.sort_values(['date', 'tic'], ignore_index = True)
        

    def add_indicators(self, vix:bool, turbulence:bool):
        fe = FeatureEngineer(
            use_technical_indicator = True,
            tech_indicator_list = self.indicators, 
            use_vix = vix, # a real time market index representing the markets expectations for volatility over the next 30 days
            use_turbulence = True, # accounts for unexpected rising and falling of the stock market
            user_defined_feature = False)

        processed = fe.preprocess_data(self.data)
        
        list_ticker = processed["tic"].unique().tolist()
        list_date = list(pd.date_range(processed['date'].min(),processed['date'].max()).astype(str))
        combination = list(itertools.product(list_date,list_ticker))

        processed_full = pd.DataFrame(combination,columns=["date","tic"]).merge(processed,on=["date","tic"],how="left")
        processed_full = processed_full[processed_full['date'].isin(processed['date'])]
        processed_full = processed_full.sort_values(['date','tic'])

        self.data = processed_full.fillna(0)
    
    def add_pca(self, vix, turbulence, n_components):
        self.add_indicators(vix, turbulence)
        self.pca = run_PCA(self.indicators, self.data, self.tickers)
        self.data = self.pca.run_PCA(n_components)
    
    def save_data(self, name):
        self.data.to_csv(f"../Datasets/{name}.csv")

#### Regime Switching Algorithm 

In [18]:
class regime_switching():
    def __init__(self, start_date, model_names, account_in):
        self.rebalance_time = 0
        self.training_start_date = start_date
        self.model_names = model_names
        self.account = account_in

        with suppress_stdout():
            self.update_data()

        self.model_kwargs_dict = {
            "a2c": model_kwargs.A2C_KWARGS,
            "ppo": model_kwargs.PPO_KWARGS,
            "sac": model_kwargs.SAC_KWARGS,
            "td3": model_kwargs.TD3_KWARGS,
            "ddpg": model_kwargs.DDPG_KWARGS
        }

        self.create_alg_variations()

    def run_regime_switching(self):
        unique_train_dates = self.algs[0].data[
            (self.algs[0].data.date < self.todays_date)
        ].date.unique()
        

        for model_params in self.algs:
            training_env, e_val_gym = self.create_environments(unique_train_dates[-63], model_params.data)
            print(f"Training {model_params.model_name}_{model_params.data_retention_level}")
            trained_model = self.train_algs(training_env, model_params)
            model_params.model = trained_model
            model_params.sharpe.append(self.val_algs(e_val_gym, trained_model))
        
        best_model = self.select_best_model()

        return best_model

    def update_data(self):
        self.todays_date = datetime.today().strftime('%Y-%m-%d')
        self.data_85 = self.get_data(0.85)
        self.data_99 = self.get_data(0.99)
    
    def get_data(self, n_components):
        pcaData = data_preparation(self.training_start_date, self.todays_date, DOW_30_TICKERS, PCA_INDICATORS)
        pcaData.add_pca(vix=True, turbulence=True, n_components = n_components)

        return pcaData.data
    
    def create_alg_variations(self):
        self.algs = []
        for name in self.model_names:
            self.algs.append(model_parameters(model_name = name,
                                              kwargs = deepcopy(self.model_kwargs_dict[name]),
                                              steps = deepcopy(model_kwargs.TIMESTEPS_DICT[name]),
                                              data = self.data_85,
                                              data_retention_level = "85"))
            self.algs.append(model_parameters(model_name = name,
                                              kwargs = deepcopy(self.model_kwargs_dict[name]),
                                              steps = deepcopy(model_kwargs.TIMESTEPS_DICT[name]),
                                              data = self.data_99,
                                              data_retention_level = "99"))
    
    def create_environments(self, train_end, data):
        train_data = data_split(
            data,
            start = self.training_start_date,
            end = train_end
        )

        val_data = data_split(
            data,
            start = train_end,
            end = self.todays_date
        )

        kwargs = {
            'hmax': self.account.hmax,
            "initial_amount": self.account.cash,
            "num_stock_shares": [0] * self.account.stock_dimension,
            "buy_cost_pct": self.account.buy_cost_list,
            "sell_cost_pct": self.account.sell_cost_list,
            "state_space": 1 + 2 * self.account.stock_dimension + len(train_data.columns[7:-2]) * self.account.stock_dimension,
            "stock_dim": self.account.stock_dimension,
            "tech_indicator_list": train_data.columns[7:-2],
            "action_space": self.account.stock_dimension,
            "reward_scaling": 1e-4
        }

        e_train_gym = StockTradingEnv(df = train_data, **kwargs)
        e_val_gym = StockTradingEnv(df = val_data, turbulence_threshold = 30, risk_indicator_col='vix', **kwargs)

        training_env, _ = e_train_gym.get_sb_env()

        return training_env, e_val_gym
    
    def train_algs(self, training_env, model_params):
        with suppress_stdout():
            agent = DRLAgent(env = training_env)
            model = agent.get_model(
                model_params.model_name,
                policy = model_params.policy,
                policy_kwargs = model_params.policy_kwargs,
                model_kwargs = model_params.kwargs
            )
            trained_model = agent.train_model(model = model, tb_log_name = model_params.model_name, total_timesteps = model_params.total_timesteps)

        return trained_model

    def val_algs(self, e_val_env, trained_model):
        with suppress_stdout():
            df_account_value, df_actions, _ = DRLAgent.DRL_prediction(
                model = trained_model, 
                environment = e_val_env
            )

        df = deepcopy(df_account_value)
        df["date"] = pd.to_datetime(df["date"])
        bt_returns = get_daily_return(df, value_col_name="account_value")

        return empyrical.sharpe_ratio(bt_returns)

    def select_best_model(self):
        best_model_sharpe = float('-inf')
        for model in self.algs:
            if model.sharpe[-1] > best_model_sharpe:
                best_model_sharpe = model.sharpe[-1]
                best_model = model
                best_model_name = model.model_name
        
        return best_model

## Alpaca Trading


#### Account

In [19]:
class account():
    def __init__(self, ticker_list, cash, num_stock_shares):
        stock_dimension = len(ticker_list)
        self.cash = cash
        self.ticker_list = ticker_list
        self.num_stock_shares = num_stock_shares
        self.buy_cost_list = [0.001] * stock_dimension
        self.sell_cost_list = [0.001] * stock_dimension
        self.stock_dimension = stock_dimension
        self.hmax = 100

#### Getting Live Data

In [20]:
class GetData:
    def __init__(self, time_interval, alpaca, tech_indicator_list, PCA_path):
        self.time_interval = time_interval
        self.alpaca = alpaca
        self.tech_indicator_list = tech_indicator_list
        self.PCA_path = PCA_path

        # PCA
        if self.PCA_path != None:
            with open(self.PCA_path, 'rb') as fp:
                self.PCA_model = pickle.load(fp)
    
    def download_live_data(self):
        print("Fetching updated data...")
        data_df = pd.DataFrame()

        # ========= Getting Data Using Alpaca =========
        if self.time_interval == TimeFrame.Day:
            days = 60
        else:
            days = 5
            
        for tic in DOW_30_TICKERS:
            df = self.alpaca.get_bars(
                symbol=tic,
                start=datetime.date.today() - datetime.timedelta(days = 300),
                timeframe=self.time_interval,
                adjustment="raw",
            ).df.tz_convert("US/Eastern").tail(300)
            df["tic"] = tic
            data_df = pd.concat([data_df, df])
        data_df = data_df.reset_index()

        # ========= Adding Technical Indicators =========
        data_df["day"] = data_df["timestamp"].dt.dayofweek
        data_df = data_df.dropna()
        data_df = data_df.reset_index(drop=True)
        data_df["timestamp"] = data_df.timestamp.apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
        data_df = data_df.sort_values(by=["timestamp", "tic"]).reset_index(drop=True)
        df = self.add_technical_indicator(data_df)
        df = df.fillna(method="ffill").fillna(method="bfill")

        # ========= Calculating Turbulence =========
        turb_df = self.alpaca.get_bars(
            symbol="VIXY",
            start=datetime.date.today() - datetime.timedelta(days = 1),
            timeframe=self.time_interval,
            adjustment="raw",
        ).df.tz_convert("US/Eastern").tail(1)
        latest_turb = turb_df["close"].values

        # ========= Applying PCA =========
        if self.PCA_path != None:
            df = df.reset_index(drop=True)
            x = df.loc[:, PCA_INDICATORS].values
            x = StandardScaler().fit_transform(x)
            new = self.PCA_model.transform(x)
            principalDf = pd.DataFrame(data = new)

            ret = df[['date','tic','open','high','low','close','volume', 'day']]
            for count in range(0, len(principalDf.columns)):
                ret[count] = principalDf[count].values
            self.number_of_pca_comps = len(principalDf.columns)
            df = ret
            
        # ========= Data Formatting =========
        start_date = df["date"].iloc[0]
        end_date = df["date"].iloc[-1]
        # print("start: ", start_date)
        # print("start: ", end_date)
        data = data_split(df, start_date, end_date)
        data = data[data.index == data.index.unique()[-1]]
        print(f"Retrieved data for timestamp: {data['date'].unique()[0]}")

        return data, latest_turb[0]

    def add_technical_indicator(self, df):
        df = df.rename(columns={"timestamp": "date"})
        df = df.copy()
        df = df.sort_values(by=["tic", "date"])
        
        stock = Sdf.retype(df.copy())
        unique_ticker = stock.tic.unique()
        tech_indicator_list = self.tech_indicator_list

        for indicator in tech_indicator_list:
            indicator_df = pd.DataFrame()
            for i in range(len(unique_ticker)):
                # print(unique_ticker[i], i)
                temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
                temp_indicator = pd.DataFrame(temp_indicator)
                temp_indicator["tic"] = unique_ticker[i]
                # print(len(df[df.tic == unique_ticker[i]]['date'].to_list()))
                temp_indicator["date"] = df[df.tic == unique_ticker[i]]["date"].to_list()
                indicator_df = pd.concat(
                    [indicator_df, temp_indicator], ignore_index=True
                )
            df = df.merge(
                indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left"
            )
        df = df.sort_values(by=["date", "tic"])
        return df

#### Main Class

In [21]:
class AlpacaPaperTrading():
    def __init__(self, ticker_list, time_interval, API_KEY, API_SECRET, API_BASE_URL, tech_indicator_list, turbulence_thresh = 30, 
                 latency = None, sell_at_end_of_day = False, pca_path = None):
        
        self.sell_at_end_of_day = sell_at_end_of_day
        self.pca_path = pca_path
        self.rebalance_counter = 0

        # Alpaca
        self.API_KEY = API_KEY
        self.API_BASE_URL = API_BASE_URL
        self.API_SECRET = API_SECRET
        self.AlpacaConnection()

        # Account
        num_stock_shares = [0] * len(ticker_list)
        for tic in range(len(ticker_list)):
            try:
                num_stock_shares[tic] = self.alpaca.get_position(ticker_list[tic]).qty_available
            except:
                num_stock_shares[tic] = 0

        self.account = account(ticker_list, float(self.alpaca.get_account().cash), num_stock_shares)
        
        # Getting Data
        self.data_retrieve = GetData(time_interval, self.alpaca, tech_indicator_list, pca_path)

        # Trading Settings
        self.tech_indicator_list = tech_indicator_list
        self.turbulence_thresh = turbulence_thresh
        self.time_interval = time_interval
        self.sell_time = 60

        # Sleep time 
        if time_interval == TimeFrame.Day:
            self.sleep_time = 86400 * time_interval.amount
        elif time_interval == TimeFrame.Hour:
            self.sleep_time = 3600 * time_interval.amount
        elif time_interval == TimeFrame.Minute:
            self.sleep_time = 60 * time_interval.amount
        
    def AlpacaConnection(self):
        try:
            self.alpaca = tradeapi.REST(self.API_KEY, self.API_SECRET, self.API_BASE_URL, 'v2')
        except:
            raise ValueError('Fail to connect Alpaca. Please check account info and internet connection')
    
    def awaitMarketOpen(self):
        isOpen = self.alpaca.get_clock().is_open
        while(not isOpen):
            clock = self.alpaca.get_clock()
            openingTime = clock.next_open.replace(tzinfo=datetime.timezone.utc).timestamp()
            currTime = clock.timestamp.replace(tzinfo=datetime.timezone.utc).timestamp()
            timeToOpen = int((openingTime - currTime) / 60)
            print(str(timeToOpen) + ' minutes till market open.')
            time.sleep(60)
            isOpen = self.alpaca.get_clock().is_open
    
    def testLatency(self, test_times = 10): 
        total_time = 0
        for i in range(0, test_times):
            time0 = time.time()
            self.get_state()
            time1 = time.time()
            temp_time = time1 - time0
            total_time += temp_time
        latency = total_time/test_times
        print('latency for data processing: ', latency)
        return latency
    
    def submitOrder(self, qty, stock, side, resp):
        if(qty > 0):
            try:
                self.alpaca.submit_order(str(stock), str(qty), side, "market", "day")
                print(f"Market order of | {str(qty)} {stock} {side} | Completed")
                resp.append(True)
            except Exception as e:
                print(f"Market order of | {str(qty)} {stock} {side} | Failed")
                resp.append(False)
                print(e)
        else:
            print(f"Quantity is 0, order of | {str(qty)} {stock} {side} | Insufficient Quantity")
            resp.append(False)
    
    def sell_stock(self, action, index, state):
        if state[index + 2 * self.account.stock_dimension + 1] != True:
            if state[index + self.account.stock_dimension + 1] > 0:
                sell_num_shares = min(
                    int(abs(action)),  int(self.account.num_stock_shares[index])
                )

                respSO = []
                tSubmitOrder = threading.Thread(
                    target=self.submitOrder(
                        sell_num_shares, # quantity
                        self.account.ticker_list[index], # asset
                        'sell', 
                        respSO
                    )
                )
                tSubmitOrder.start()
                tSubmitOrder.join()
                if respSO[-1] == True:
                    self.account.cash = float(self.alpaca.get_account().cash)
                    if int(sell_num_shares) == int(self.account.num_stock_shares[index]):
                        self.account.num_stock_shares[index] = 0
                    else:
                        self.account.num_stock_shares[index] -= sell_num_shares

    def buy_stock(self, action, index, state):
        if state[index + 2 * self.account.stock_dimension + 1] != True:
            if float(self.alpaca.get_account().cash) > 0:

                available_amount =  float(self.alpaca.get_account().cash) // (state[index + 1] * (1 + self.account.buy_cost_list[index]))
                buy_num_shares = int(min(available_amount, action))

                respSO = []
                tSubmitOrder = threading.Thread(
                    target=self.submitOrder(
                        buy_num_shares, # quantity
                        self.account.ticker_list[index], # asset
                        'buy', 
                        respSO
                    )
                )
                tSubmitOrder.start()
                tSubmitOrder.join()

                if respSO[-1] == True:
                    self.account.cash = float(self.alpaca.get_account().cash)
                    self.account.num_stock_shares[index] += buy_num_shares
    
    def get_state(self):
        print("Creating state...")
        # get current data
        data, turb = self.data_retrieve.download_live_data()
        self.current_turbulence = turb

        if self.pca_path == None:
            ind_cols = data.loc[:, PCA_INDICATORS].columns
        else:
            ind_cols = list(range(0,self.data_retrieve.number_of_pca_comps))
            # ind_cols.append("day")
            
        state = (
            [self.account.cash] # starting cash
            + data.close.values.tolist() # all close prices
            + self.account.num_stock_shares # number of assets owned per stock 
            + sum(
                (
                    data[tech].values.tolist()
                    for tech in ind_cols
                ),
                [],
            )
        )
        state = np.array([state])
        state = state.astype('float64')
        return state

    def trade(self):
        state = self.get_state()
        action, _states = self.agent.predict(state, deterministic = True)
        
        action = action * self.account.hmax
        action = action.astype(int) 
        print(action)
        
        if self.current_turbulence < self.turbulence_thresh:
            argsort_actions = np.argsort(action[0])
            sell_index = argsort_actions[: np.where(action < 0)[0].shape[0]]
            buy_index = argsort_actions[::-1][: np.where(action > 0)[0].shape[0]]

            for index in sell_index:
                self.sell_stock(action[0][index], index, state[0])
                time.sleep(5)
            
            for index in buy_index:
                self.buy_stock(action[0][index], index, state[0])
                time.sleep(5)
            
        else:
            print("Turbulence is to high to trade ...")
            print("Skipping trading cycle")

    def select_new_alg(self):
        # Agent
        self.select_alg = regime_switching('2010-01-01', ["a2c", "td3", "ddpg", "sac", "ppo"], account_in = self.account)
        self.agent = self.select_alg.run_regime_switching().model

    def start(self):
        # Ensure orders are clear
        orders = self.alpaca.list_orders(status = "open")
        for order in orders:
            self.alpaca.cancel_order(order.id)
        
        # self.select_new_alg()
        
        # Awaiting Market Open
        print("Waiting for market to open...")
        tAMO = threading.Thread(target=self.awaitMarketOpen) # setting function to a different thread so it can run in the background
        tAMO.start()
        tAMO.join()
        print("Market Opened") 

        while True:
            # Market Closing time
            clock = self.alpaca.get_clock()
            closingTime = clock.next_close.replace(tzinfo=datetime.timezone.utc).timestamp()
            currTime = clock.timestamp.replace(tzinfo=datetime.timezone.utc).timestamp()
            self.timeToClose = closingTime - currTime

            if self.timeToClose < (self.sell_time):
                if self.sell_at_end_of_day == False:
                    # Stop Trading
                    print(f"Market closing in {self.sell_time} seconds. Stopping Trading")
                    break

                else:
                    # Sell All
                    print(f"Market closing in {self.sell_time} seconds. Closing Positions")

                    positions = self.alpaca.list_positions()
                    for position in positions:
                        if position.side == 'long':
                            orderSide = 'sell'
                        else:
                            orderSide = 'buy'
                        
                        qty = abs(int(float(position.qty)))
                        respSO = []
                        tSubmitOrder = threading.Thread(target=self.submitOrder(qty, position.symbol, orderSide, respSO))
                        tSubmitOrder.start()
                        tSubmitOrder.join()

                    time.sleep(60 * 10)

            else:
                trade = threading.Thread(target=self.trade)
                trade.start()
                trade.join()
                # last_equity = float(self.alpaca.get_account().last_equity)
                # self.account.equities.append([time.time(), last_equity])
                
                time.sleep(120)
                self.rebalance_counter += 1
                if self.rebalance_counter == 63:
                    self.rebalance_counter = 0
                    self.select_new_alg()
                self.account.cash = float(self.alpaca.get_account().cash)

#### Running

In [22]:
paper_trader = AlpacaPaperTrading(ticker_list = DOW_30_TICKERS, 
                                  time_interval = TimeFrame.Day,
                                  API_KEY = API_KEY, 
                                  API_SECRET = API_SECRET, 
                                  API_BASE_URL = API_BASE_URL, 
                                  tech_indicator_list = PCA_INDICATORS, 
                                  turbulence_thresh = 70, 
                                  latency = None,
                                  pca_path = r"D:\University\Thesis\Multi Algorithm Soluction\trained_models\PCA_Model_0.9.pickle"
                                )

In [23]:
paper_trader.start()

Waiting for market to open...
4536 minutes till market open.
4535 minutes till market open.
