# Algorithmic Trading Crypto with Websockets

## Websocket Basic Usage Example

In this example, we can use the Stream object from the alpaca-trade-api to set up streaming real-time Bitcoin (BTCUSD) price or trade data.

In the main() function, we can use the subscribe_crypto_quotes or subscribe_crypto_trades methods of the Stream object to specify which data we want to receive. As the first parameter for these methods, we can place the function print_quote or print_trade to specify what to do with the data once it is received. For the second parameter, we can enter the symbol. 

Next, we can set up an asynchronous function that only executes once the data is received and run the stream!

In [None]:
from alpaca_trade_api.stream import Stream

ALPACA_API_KEY = 'APCA-API-KEY-ID'
ALPACA_SECRET_KEY = 'APCA-API-SECRET-KEY'

async def print_quote(q):
    print('quote', q)

async def print_trade(t):
    print('trade', t)

def main():
    stream = Stream(ALPACA_API_KEY,ALPACA_SECRET_KEY, raw_data=True)
    stream.subscribe_crypto_quotes(print_quote, 'BTCUSD')
    stream.subscribe_crypto_trades(print_trade, 'BTCUSD')

    @stream.on_bar('BTCUSD')
    async def _(bar):
        print('bar', bar)

    stream.run()

if __name__ == "__main__":
    main()

## Websocket Reconnection

In the case of any websocket disconnection, we can wrap the following basic code we used in the previous example with a try/except block so that the websocket can reconnect.

For trading systems that rely on real-time data, it’s integral to use a data stream that can run continuously regardless of errors. In this example, the try/except block prevents the program from ending if the websocket connection runs into any errors. Instead, the error will be printed out and after a set period of time, the program will try to re-establish connection to the websocket.

In [None]:
import asyncio
import logging
import time
from alpaca_trade_api.stream import Stream
from alpaca_trade_api.common import URL

logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)

ALPACA_API_KEY = '********************'
ALPACA_SECRET_KEY = '****************************************'

def run_connection(conn):
    try:
        conn.run()
    except KeyboardInterrupt:
        print("Interrupted execution by user")
        asyncio.get_event_loop().run_until_complete(conn.stop_ws())
        exit(0)
    except Exception as e:
        print(f'Exception from websocket connection: {e}')
    finally:
        print("Trying to re-establish connection")
        time.sleep(3)
        run_connection(conn)

async def print_quote(q):
    print('quote', q)

if __name__ == '__main__':
    conn = Stream(ALPACA_API_KEY,
                  ALPACA_SECRET_KEY,
                  base_url=URL('https://paper-api.alpaca.markets'))

    conn.subscribe_crypto_bars(print_quote, 'BTCUSD')

    run_connection(conn)

## Websocket Pause/Resume Data Stream

In order to stop and start the websocket connection at will, we can use the ThreadPoolExecutor method from the concurrent package. This can allow us to shut down the websocket subscription and turn it on again.

In [None]:
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from alpaca_trade_api.stream import Stream
from alpaca_trade_api.common import URL

ALPACA_API_KEY = '********************'
ALPACA_SECRET_KEY = '****************************************'

async def print_quote(q):
    print('quote', q)

def consumer_thread():
    global conn
    conn = Stream(ALPACA_API_KEY,
                  ALPACA_SECRET_KEY,
                  base_url=URL('https://paper-api.alpaca.markets'))

    conn.subscribe_crypto_quotes(print_quote, 'BTCUSD')
    conn.run()


if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s  %(levelname)s %(message)s',
                        level=logging.INFO)

    pool = ThreadPoolExecutor(1)

    while 1:
        try:
            pool.submit(consumer_thread)
            time.sleep(20)
            conn.stop_ws()
            time.sleep(20)
        except KeyboardInterrupt:
            print("Interrupted execution by user")
            conn.stop_ws()
            exit(0)
        except Exception as e:
            print("You got an exception: {} during execution. continue "
                  "execution.".format(e))
            # let the execution continue
            pass

## Websocket Dynamic Subscription

In addition to subscribing to data for only one cryptocurrency at a time, we can set up the websocket to change the subscription on demand.

In this case, we can set up a dictionary with all of the the symbols and their corresponding functions we wish to execute. Then, we can run a for loop to iterate through the dictionary items and run the websocket connection for that specified symbol.

In [None]:
import logging
import threading
import time
from alpaca_trade_api.stream import Stream
from alpaca_trade_api.common import URL

ALPACA_API_KEY = '********************'
ALPACA_SECRET_KEY = '****************************************'

async def print_quote(q):
    print('quote', q)

PREVIOUS = None

def consumer_thread():
    global conn
    conn = Stream(ALPACA_API_KEY,
                  ALPACA_SECRET_KEY,
                  base_url=URL('https://paper-api.alpaca.markets'))

    conn.subscribe_crypto_quotes(print_quote, 'BTCUSD')
    global PREVIOUS
    PREVIOUS = "BTCUSD"
    conn.run()

if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        level=logging.INFO)
    threading.Thread(target=consumer_thread).start()
    time.sleep(5)  # give the initial connection time to be established
    subscriptions = {"ETHUSD": print_quote,
                     "BTCUSD": print_quote,
                     "DOGEUSD": print_quote,
                     }

    while 1:
        for ticker, handler in subscriptions.items():
            conn.subscribe_crypto_quotes(PREVIOUS)
            conn.subscribe_crypto_quotes(handler, ticker)
            PREVIOUS = ticker
            time.sleep(20)

## Websocket Live Trading Example

Given that we have the knowledge behind how to stream live data with Alpaca, we can go ahead and start building the trading bot based on cross sectional momentum in the crypto markets.

Typical cross-sectional momentum strategies involve ranking securities based on their recent returns and using that data to go long the best performing assets and go short the worst performing assets, hoping that the prevailing trend in both cases will continue. Since Alpaca does not support shorting cryptocurrency, this tutorial will cover just going long the best performing crypto from the last x period of days.

In [None]:
# Import Dependencies
import numpy as np
import pandas as pd
import alpaca_trade_api as tradeapi
import datetime as dt

# API Credentials
ALPACA_API_KEY = '********************'
ALPACA_SECRET_KEY = '****************************************'
api = tradeapi.REST(ALPACA_API_KEY, ALPACA_SECRET_KEY,'https://paper-api.alpaca.markets')
api.close_all_positions()

# Date Variables
start_date = dt.date.today() - dt.timedelta(days = 60)
end_date = dt.date.today()

# Check Whether Account Currently Holds Symbol
def check_positions(symbol):
    positions = api.list_positions()
    for p in positions:
        if p.symbol == symbol:
            return float(p.qty)
    return 0

# Cross Sectional Momentum Bot Function
def cross_sectional_momentum(bar):
    try:
        # Get the Latest Data
        dataframe = pd.DataFrame()
        symbols = ['BTCUSD','ETHUSD','DOGEUSD','SHIBUSD','MATICUSD','ALGOUSD','AVAXUSD','LINKUSD','SOLUSD']
        for symbol in symbols:
            data = api.get_crypto_bars(symbol, tradeapi.TimeFrame(1, tradeapi.TimeFrameUnit.Day), start=start_date, end=end_date, exchanges=['FTXU']).df['close']
            data = pd.DataFrame(data).rename(columns={"close": str(symbol)})
            dataframe = pd.concat([dataframe,data], axis=1, sort=False)

        returns_data = dataframe.apply(func = lambda x: x.shift(-1)/x - 1, axis = 0)

        # Calculate Momentum Dataframe
        momentum_df = returns_data.apply(func = lambda x: x.shift(1)/x.shift(7) - 1, axis = 0)
        momentum_df = momentum_df.rank(axis = 1)
        for col in momentum_df.columns:
            momentum_df[col] = np.where(momentum_df[col] > 8, 1, 0)

        # Get Symbol with Highest Momentum
        momentum_df['Buy'] = momentum_df.astype(bool).dot(momentum_df.columns)
        buy_symbol = momentum_df['Buy'].iloc[-1]
        old_symbol = momentum_df['Buy'].iloc[-2]

        # Account Details
        current_position = check_positions(symbol=buy_symbol)
        old_position = check_positions(symbol=old_symbol)

        # No Current Positions
        if current_position == 0 and old_position == 0:
            cash_balance = api.get_account().non_marginable_buying_power
            api.submit_order(buy_symbol, notional=cash_balance, side='buy')
            message = f'Symbol: {buy_symbol} | Side: Buy | Notional: {cash_balance}'
            print(message)

        # No Current Position and Yes Old Position
        if current_position == 0 and old_position == 1:
            api.close_position(old_position)
            message = f'Symbol: {old_symbol} | Side: Sell'
            print(message)

            cash_balance = api.get_account().non_marginable_buying_power
            api.submit_order(buy_symbol, notional=cash_balance, side='buy')
            message = f'Symbol: {buy_symbol} | Side: Buy | Notional: {cash_balance}'
            print(message)

        print("-"*20)

    except Exception as e:
        print (e)

# Create instance of Alpaca data streaming API
alpaca_stream = tradeapi.Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, raw_data=True, crypto_exchanges=['FTXU'])

# Create handler for receiving live bar data
async def on_crypto_bar(bar):
    print(bar)
    cross_sectional_momentum(bar)

# Subscribe to data and assign handler
alpaca_stream.subscribe_crypto_daily_bars(on_crypto_bar, 'BTCUSD','ETHUSD','DOGEUSD','SHIBUSD','MATICUSD','ALGOUSD','AVAXUSD','LINKUSD','SOLUSD')

# Start streaming of data
alpaca_stream.run()