### Environment Setting

In [1]:
%pip install websocket-client
%pip install finnhub-python

Collecting websocket-client
  Downloading websocket_client-1.2.3-py3-none-any.whl (53 kB)
[?25l[K     |██████▏                         | 10 kB 17.0 MB/s eta 0:00:01[K     |████████████▎                   | 20 kB 8.0 MB/s eta 0:00:01[K     |██████████████████▍             | 30 kB 7.0 MB/s eta 0:00:01[K     |████████████████████████▌       | 40 kB 6.4 MB/s eta 0:00:01[K     |██████████████████████████████▋ | 51 kB 4.0 MB/s eta 0:00:01[K     |████████████████████████████████| 53 kB 756 kB/s 
[?25hInstalling collected packages: websocket-client
Successfully installed websocket-client-1.2.3
Collecting finnhub-python
  Downloading finnhub_python-2.4.8-py3-none-any.whl (10 kB)
Installing collected packages: finnhub-python
Successfully installed finnhub-python-2.4.8


In [2]:
import requests
import websocket
import multiprocessing
import datetime
import time
import pandas as pd
import json

apikey1 = "6O301F2DIJ4X1B5K"
apikey2 = "c7maqfiad3id8p04dgb0"

## Trading Server & Client Thread


This thread will obtain data from these two sources and stitch the data appropriately. From these two sources, you should be able to build a consistent intraday price series.

< Source 1 >
    
    Alpha Vantage API: Realtime and historical US stock prices.
    API documentation here: https://www.alphavantage.co/documentation/
    Keep in mind that the free version of this API contains a max of 5 calls/min or 500 calls/day

< Source 2 >
    
    Finhub API: https://finnhub.io/
    API documentation here: https://finnhub.io/docs/api
    We will be using this api to gather real time quotes, that will be the quote request on the documentation.
    The t on the response is timestamp for the prices


In [3]:
class ServerThread():

    def __init__(self):
        self.tickers = ['AAPL'] #by default
        self.interval = 5 #min
        self.saved = {} #flag for only save one time
        self.dfHistory = pd.DataFrame()


    #Source 1
    def getHistoryData(self, tickers, interval=5, outputsize="full"):
        data = []
        time.sleep(60)
        for ticker in tickers:
            url = f'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol={ticker}&outputsize={outputsize}&interval={interval}min&apikey={apikey1}'
            r = requests.get(url)
            data.append(r.json())
        return data

    #Source 2
    def getRealtimeData(self, tickers, interval=5, timeout=100):
        
        def on_message(ws, message):
            data.extend(json.loads(message)['data'])

        def on_error(ws, error):
            print(error)

        def on_close(ws):
            print("### closed ###")

        def on_open(ws):
            for ticker in tickers:
                message = '{'+f'"type":"subscribe","symbol":\"{ticker}\"'+'}'
                ws.send(message)

        data = []
        websocket.enableTrace(True)
        ws = websocket.WebSocketApp(f"wss://ws.finnhub.io?token={apikey2}",
                                on_message = on_message,
                                on_error = on_error,
                                on_close = on_close)
        ws.on_open = on_open
        ws.run_forever(ping_interval=interval*60, ping_timeout=timeout)
        ws.close()
        ws.on_close = on_close 
        return data        


    def processHistoryData(self):
        historyData = self.getHistoryData(self.tickers, self.interval)
        
        columns = ['open','high','low','close','volume']
        if(len(historyData)<=0):
            print(f"Error: unable to retrieve data")
            return -1

        
        for data in historyData:
            print(data.keys())
            try:
                df = pd.DataFrame.from_dict(list(data[f'Time Series ({self.interval}min)'].items()))
                df.columns = ['datetime', "data"]
                df[columns] = df['data'].apply(pd.Series)
                df = df.drop('data', axis=1)
                df['ticker'] = data['Meta Data']['2. Symbol']
                df['price'] = df['close']
                df['datetime'] = df['datetime'].apply(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S'))
                self.dfHistory = pd.concat([self.dfHistory, df])
            except:
                print(f"Error: unable to retrieve data with error message {data.values()}\n")
                return -1
        self.dfHistory.sort_values(by=['datetime'], inplace=True)
        for col in columns:
            self.dfHistory[col] = self.dfHistory[col].apply(lambda x: float(x))
        self.dfHistory['price'] = self.dfHistory['price'].apply(lambda x: float(x))
        
        print(self.dfHistory.head())


      # commented out since there is no realtime data over weekend, cannot get anything 
      #also need to figure out a way to stop the socket form quering forever 
    def processRealtimeData(self):

        #data = self.getRealtimeData(self.tickers, self.interval)
        #self.dfRealtime = pd.DataFrame.from_records(data)

        #self.dfRealtime["datetime"] = self.dfRealtime['t'].apply(lambda x: datetime.datetime.fromtimestamp(x/1000.0))
        #self.dfRealtime = self.dfRealtime.rename(columns={"p":"price", "s":"ticker", "v":"volume"})
        #print(self.dfRealtime.head())
        return


    def calcStrategyPnL(self, strategy="trend"):
        self.dfHistory.index = self.dfHistory['datetime']
        self.dfHistory['price_avg'] = self.dfHistory['close'].rolling('24h').mean()
        self.dfHistory['price_std'] = self.dfHistory['close'].rolling('24h').std()
        self.dfHistory.fillna(0, inplace=True)
        self.dfHistory['position'] = 0
        self.dfHistory['long_signal'] = (self.dfHistory.shift()['price'] - self.dfHistory.shift()['price_avg'] - self.dfHistory.shift()['price_std']) > 0
        self.dfHistory['short_signal'] = (self.dfHistory.shift()['price'] - self.dfHistory.shift()['price_avg'] + self.dfHistory.shift()['price_std']) < 0
        self.dfHistory.head()

        action = 1 if strategy=="trend" else -1

        self.dfHistory.loc[self.dfHistory['long_signal']==True, 'position'] = action
        self.dfHistory.loc[self.dfHistory['short_signal']==True, 'position'] = (-1)*action
        self.dfHistory['position'].fillna(0,inplace=True)
        self.dfHistory['signal'] = self.dfHistory['position'].replace(to_replace=0, method='ffill')

        self.dfHistory['pnl'] = self.dfHistory['signal'].shift()*self.dfHistory['price'].pct_change()
        self.dfHistory['pnl'].fillna(0, inplace = True)
        PnL = self.dfHistory['pnl'].sum()

        tickers = list(self.dfHistory['ticker'].unique())

        for ticker in tickers:
            if not self.saved.get(ticker, False): 
                self.dfHistory[['ticker','price', 'signal','pnl']][self.dfHistory['ticker']==ticker].to_csv(f'{ticker}_result.csv')
                self.dfHistory[['ticker','price']][self.dfHistory['ticker']==ticker].to_csv(f'{ticker}_price.csv')
                self.saved[ticker] = True
        
        print(f"Strategy {strategy} yields PnL {PnL}")
        return PnL

    
    def processServerQuery(self, query):
        token = query.split(" ")
        if len(token)<=0: 
            print("invalid query, please type again")
            return -1
        print(token)
        if(token[0]=="--tickers"):
            if(len(token)>1): #=1 already init as ['AAPL']
                print(f"download new tickers {token[1]}")
                tickers = token[1].split(", ")
                if not self.checkTickerValid(tickers): 
                    print("Error: invalid ticker, please re-specify\n")
                    return -1
                if(len(tickers)>3):
                    print("currently max support 3 tickers, please re-specify\n")
                    return -1
                self.tickers = tickers
            if self.processHistoryData()==-1 or self.processRealtimeData()==-1: return -1
            return self.calcStrategyPnL()

        elif(token[0]=="--reload"):
            path = token[1]
            if path[-3:]!="csv":
                print("Error: currently only supports csv file, please re-specify\n")
                return -1
            self.dfHistory = pd.read_csv(path, parse_dates=["datetime"])
        
        elif(token[0]=="--minutes"):
            print("change interval minutes")
            if(not token[1].isdigit()):
                print("Error: currently only supports intervals in [5, 10, 30, 69], please re-specify\n")
                return -1
            interval = int(token[1])
            self.interval = interval
    
    def processClientQuery(self, query):
        token = query.split(" ")
        
        if len(token)<=0: 
            print("invalid query, please type again")
            return

        date = ""
        if(token[0]=="--price" or token[0]=="--signal"):
            if(len(token)<=1):
                print("Error: invalid date time, please re-specify\n")
            if token[1]=="now":
                date = datetime.datetime.today()
                return self.queryData(token[0], date)
            date = datetime.datetime.strptime(str(token[1]), "%Y-%m-%d-%H:%M")
            return self.queryData(token[0], date)
            
        elif(token[0]=="--del_ticker"):
            if(len(token)<=1): print("Error: invalid ticker, please re-specify\n")
            return self.delTicker(token[1])

        elif(token[0]=="--add_ticker"):
            if(len(token)<=1): print("Error: invalid ticker, please re-specify\n")
            return self.addTicker(token[1])
        
        elif(token[0]=="--reset"):
            return self.resetServer()

    def queryData(self, column, date):
        column = column[2:]
        if column!="price" and column!="signal":
            print("Error: invalid query, must be price or signal")
            return -1
        df = self.dfHistory.copy()
        date_str = datetime.datetime.strftime(date, "%Y-%m-%d-%H:%M")
        df["datetime_str"] = df["datetime"].apply(lambda x: datetime.datetime.strftime(x, "%Y-%m-%d-%H:%M"))
        data = df[df['datetime_str']==date_str][["ticker",column]]
        if data.shape[0]==0: return "Server has no data"
        return data

    def checkTickerValid(self, tickers):
        for ticker in tickers:
            outputsize = "compact"
            historyData = self.getHistoryData(ticker, self.interval, outputsize)
            if(len(historyData)<=0 or 'Error Message' in historyData[0].keys()): #invalid ticker
                return False
        return True

    def addTicker(self, ticker):
        if ticker in self.tickers: return 0
        if not self.checkTickerValid(ticker): return 2
        print(f"start to download data for new ticker {ticker}")
        if(self.processServerQuery(f"--tickers {ticker}")==-1): return 1 #server error
        return 0
            
    def delTicker(self, ticker):
        if ticker not in self.tickers:
            return 2
        try:
            self.tickers.remove(ticker)
            return 0
        except:
            return 1 #server error

    def resetServer(self):
        self.saved = {} #re-download all the data
        
        if self.processHistoryData()==-1 or self.processRealtimeData()==-1: return -1
        self.calcStrategyPnL()
        return 0

    def getServerTickers(self):
         return self.tickers

### Server Tests

Test server queries, strategy PnL calc etc.

In [4]:
server = ServerThread()

In [5]:
#query = input("Please specify your query, supported command include \"--tickers, --reload, --minutes\"\n")
query = "--tickers AAPL"
server.processServerQuery(query)

['--tickers', 'AAPL']
download new tickers AAPL
dict_keys(['Meta Data', 'Time Series (5min)'])
                datetime    open    high  ...   volume  ticker   price
3571 2021-12-27 04:05:00  176.40  176.41  ...   7112.0    AAPL  176.38
3570 2021-12-27 04:10:00  176.28  176.32  ...  10249.0    AAPL  176.32
3569 2021-12-27 04:15:00  176.28  176.28  ...   7903.0    AAPL  176.17
3568 2021-12-27 04:20:00  176.01  176.01  ...   1245.0    AAPL  176.01
3567 2021-12-27 04:25:00  175.93  175.93  ...   1198.0    AAPL  175.91

[5 rows x 8 columns]
Strategy trend yields PnL -0.048182392332353885


-0.048182392332353885

In [6]:
PnL1 = server.calcStrategyPnL("trend")
PnL2 = server.calcStrategyPnL("meanrev")
print(PnL1, PnL2)

Strategy trend yields PnL -0.048182392332353885
Strategy meanrev yields PnL 0.048182392332353885
-0.048182392332353885 0.048182392332353885


### Client Tests

In [7]:
server = ServerThread()

In [8]:
#query = input("please type in your query: ")
server.getServerTickers()

query = "--del_ticker NFLX"
server.processClientQuery(query)
server.getServerTickers()

['AAPL']

In [9]:
query = "--reload AAPL_result.csv"
server.processServerQuery(query)

['--reload', 'AAPL_result.csv']


In [10]:

query = "--signal 2021-12-27-04:45"
server.processClientQuery(query)

Unnamed: 0,ticker,signal
8,AAPL,-1


In [11]:
query = "--price 2021-12-27-04:20"
server.processClientQuery(query)

Unnamed: 0,ticker,price
3,AAPL,176.01


In [12]:
query = "--price 2016-07-29-13:34"
server.processClientQuery(query)

'Server has no data'