Will Munson - Quantitative Researcher - [LinkedIn](https://www.linkedin.com/in/will-munson-814bb01a3/)

Building an Algorithmic Trading Platform

In [2]:
pip install v20 #The OANDA v20 REST API provides access to OANDA's v20 trading engine

Collecting v20
  Downloading v20-3.0.25.0.tar.gz (75 kB)
[K     |████████████████████████████████| 75 kB 1.3 MB/s eta 0:00:011
Collecting ujson
  Downloading ujson-4.0.2-cp36-cp36m-manylinux1_x86_64.whl (179 kB)
[K     |████████████████████████████████| 179 kB 6.2 MB/s eta 0:00:01
Building wheels for collected packages: v20
  Building wheel for v20 (setup.py) ... [?25ldone
[?25h  Created wheel for v20: filename=v20-3.0.25.0-py3-none-any.whl size=85763 sha256=d3415b2518aa10a3251c30634fabff945e6ef4e34bfef14bef06ef4e714257f8
  Stored in directory: /home/jovyan/.cache/pip/wheels/a3/0e/23/87105704f84e1ef9bb4c73e863d601a5748d3ec9d9c030ee28
Successfully built v20
Installing collected packages: ujson, v20
Successfully installed ujson-4.0.2 v20-3.0.25.0
Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install alpha_vantage #module used to get stock data from the Alpha Vantage API

Collecting alpha_vantage
  Downloading alpha_vantage-2.3.1-py3-none-any.whl (31 kB)
Collecting aiohttp
  Downloading aiohttp-3.7.4.post0-cp36-cp36m-manylinux2014_x86_64.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 7.5 MB/s eta 0:00:01
Collecting async-timeout<4.0,>=3.0
  Downloading async_timeout-3.0.1-py3-none-any.whl (8.2 kB)
Collecting multidict<7.0,>=4.5
  Downloading multidict-5.1.0-cp36-cp36m-manylinux2014_x86_64.whl (141 kB)
[K     |████████████████████████████████| 141 kB 38.5 MB/s eta 0:00:01
[?25hCollecting yarl<2.0,>=1.0
  Downloading yarl-1.6.3-cp36-cp36m-manylinux2014_x86_64.whl (293 kB)
[K     |████████████████████████████████| 293 kB 37.3 MB/s eta 0:00:01
[?25hCollecting idna-ssl>=1.0
  Downloading idna-ssl-1.1.0.tar.gz (3.4 kB)
Building wheels for collected packages: idna-ssl
  Building wheel for idna-ssl (setup.py) ... [?25ldone
[?25h  Created wheel for idna-ssl: filename=idna_ssl-1.1.0-py3-none-any.whl size=3160 sha256=22f5140efd26a3dd677a44bc8

The generic Broker class

In [4]:
from abc import abstractmethod # abc (abstract base class) is used to implement abstract methods
# Abstract methods are methods that are declared, but not implemented.


class Broker(object):
	 # Constructor class 
	def __init__(self, host, port):
		self.host = host # Set host
		self.port = port # Set port

		# Keep track of price, order and position 
		self.__price_event_handler = None
		self.__order_event_handler = None
		self.__position_event_handler = None

	@property
	def on_price_event(self): # Price getter
		"""
		Listeners will receive:
		symbol, bid, ask
		"""
		return self.__price_event_handler

	@on_price_event.setter
	def on_price_event(self, event_handler): # Sets the price
		self.__price_event_handler = event_handler

	@property
	def on_order_event(self):
		"""
		Listeners will receive:
		transaction_id
		"""
		return self.__order_event_handler
    
	@on_order_event.setter
	def on_order_event(self, event_handler):
		self.__order_event_handler = event_handler

	@property
	def on_position_event(self):
		"""
		Listeners will receive:
		symbol, is_long, units, unrealized_pnl, pnl
		"""
		return self.__position_event_handler

	@on_position_event.setter
	def on_position_event(self, event_handler):
		self.__position_event_handler = event_handler

	@abstractmethod
	def get_prices(self, symbols=[]):
		"""
		Query market prices from a broker
		:param symbols: list of symbols recognized by your broker
		"""
		raise NotImplementedError('Method is required!')

	@abstractmethod
	def stream_prices(self, symbols=[]):
		""""
		Continuously stream prices from a broker.
		:param symbols: list of symbols recognized by your broker
		"""
		raise NotImplementedError('Method is required!')

	@abstractmethod
	def send_market_order(self, symbol, quantity, is_buy):
		raise NotImplementedError('Method is required!')

Oanda Broker class

In [5]:
import v20 #The OANDA v20 REST API provides access to OANDA's v20 trading engine


class OandaBroker(Broker):
	#Practice recommended for testing 
	PRACTICE_API_HOST = 'api-fxpractice.oanda.com'
	PRACTICE_STREAM_HOST = 'stream-fxpractice.oanda.com'

	# Live recommended for production-ready code to be executed 
	LIVE_API_HOST = 'api-fxtrade.oanda.com'
	LIVE_STREAM_HOST = 'stream-fxtrade.oanda.com'

	PORT = '443'

	def __init__(self, accountid, token, is_live=False):
		if is_live: # If the information is valid,
			#Code is ready to execute 
			host = self.LIVE_API_HOST
			stream_host = self.LIVE_STREAM_HOST
		else: #Otherwise, the user is sent back for testing
			host = self.PRACTICE_API_HOST
			stream_host = self.PRACTICE_STREAM_HOST

		super(OandaBroker, self).__init__(host, self.PORT)

		self.accountid = accountid # Holds your account ID
		self.token = token # Holds your token ID

		self.api = v20.Context(host, self.port, token=token)
		self.stream_api = v20.Context(stream_host, self.port, token=token)
        
	def get_prices(self, symbols=[]):
		response = self.api.pricing.get(
			self.accountid,
			instruments=",".join(symbols),
			snapshot=True,
			includeUnitsAvailable=False
		)
		body = response.body
		prices = body.get('prices', [])
		for price in prices:
			self.process_price(price)

	def process_price(self, price):
		symbol = price.instrument

		if not symbol: #If price symbol is missing, indicate that it's empty
			print('Price symbol is empty!')
			return

		# Selling (bid) and buying (ask) prices
		bids = price.bids or []
		price_bucket_bid = bids[0] if bids and len(bids) > 0 else None # Keep track of each and every bid made, unless there were none.
		bid = price_bucket_bid.price if price_bucket_bid else 0

		asks = price.asks or []
		price_bucket_ask = asks[0] if asks and len(asks) > 0 else None
		ask = price_bucket_ask.price if price_bucket_ask else 0

		self.on_price_event(symbol, bid, ask)
        
	def stream_prices(self, symbols=[]):
		response = self.stream_api.pricing.stream(
			self.accountid,
			instruments=",".join(symbols),
			snapshot=True
		)

		for msg_type, msg in response.parts():
			if msg_type == "pricing.Heartbeat":
				continue
			elif msg_type == "pricing.ClientPrice":
				self.process_price(msg)

	def send_market_order(self, symbol, quantity, is_buy):
		response = self.api.order.market(
			self.accountid,
			units=abs(quantity) * (1 if is_buy else -1),
			instrument=symbol,
			type='MARKET',
		)
		if response.status != 201: # If the http status is not 201 (or CREATED)
			self.on_order_event(symbol, quantity, is_buy, None, 'NOT_FILLED')
			return

		body = response.body
		if 'orderCancelTransaction' in body: #If there is a cancelled transaction, it's not filled
			self.on_order_event(symbol, quantity, is_buy, None, 'NOT_FILLED')
			return
        
		 # Otherwise, the transaction is filled
		transaction_id = body.get('lastTransactionID', None)
		self.on_order_event(symbol, quantity, is_buy, transaction_id, 'FILLED')
        
	def get_positions(self):
		response = self.api.position.list(self.accountid)
		body = response.body
		positions = body.get('positions', [])
		for position in positions:
			symbol = position.instrument
			unrealized_pnl = position.unrealizedPL
			pnl = position.pl
			long = position.long
			short = position.short

			if short.units:
				self.on_position_event(
					symbol, False, short.units, unrealized_pnl, pnl)
			elif long.units:
				self.on_position_event(
					symbol, True, long.units, unrealized_pnl, pnl)
			else:
				self.on_position_event(
					symbol, None, 0, unrealized_pnl, pnl)

Getting prices

In [6]:
# Replace these 2 values with your own!
ACCOUNT_ID = '001-001-6645794-001'
API_TOKEN = '6ecf6b053262c590b78bb8199b85aa2f-d99c54aecb2d5b4583a9f707636e8009' # Hash code (this is taking me forever)

broker = OandaBroker(ACCOUNT_ID, API_TOKEN)

In [7]:
SYMBOL = 'EUR_USD' # In this example, we're converting the Euro to the US Dollar

In [8]:
import datetime as dt # Code that displays the date and time 

def on_price_event(symbol, bid, ask):
    print(
        dt.datetime.now(), '[PRICE]', #Set to the current date and time
        symbol, 'bid:', bid, 'ask:', ask
    )

broker.on_price_event = on_price_event

In [9]:
broker.get_prices(symbols=[SYMBOL]) #I think something's supposed to print after this statement.

Sending a simple market order

In [10]:
def on_order_event(symbol, quantity, is_buy, transaction_id, status):
    print(
        dt.datetime.now(), '[ORDER]', # Current date and time
        'transaction_id:', transaction_id,
        'status:', status,
        'symbol:', symbol,
        'quantity:', quantity,
        'is_buy:', is_buy,
    )

broker.on_order_event = on_order_event
broker.send_market_order(SYMBOL, 1, True) #Set values for symbol, quantity and is_buy

2021-06-27 19:35:50.098261 [ORDER] transaction_id: None status: NOT_FILLED symbol: EUR_USD quantity: 1 is_buy: True


Getting position updates

In [11]:
def on_position_event(symbol, is_long, units, upnl, pnl):
    print(
        dt.datetime.now(), '[POSITION]', # Current date and time 
        'symbol:', symbol,
        'is_long:', is_long,
        'units:', units,
        'upnl:', upnl,
        'pnl:', pnl
    )

broker.on_position_event = on_position_event
broker.get_positions()

Building a mean-reverting algorithmic trading system

In [12]:
import datetime as dt # Code that displays the date and time
import pandas as pd # Used for high-level indexing


class MeanReversionTrader(object):
    def __init__(
        self, broker, symbol=None, units=1,
        resample_interval='60s', mean_periods=5
    ):
        """
        A trading platform that trades on one side
            based on a mean-reverting algorithm.

        :param broker: Broker object
        :param symbol: A str object recognized by the broker for trading
        :param units: Number of units to trade
        :param resample_interval:
            Frequency for resampling price time series
        :param mean_periods: Number of resampled intervals
            for calculating the average price
        """
        self.broker = self.setup_broker(broker)

        self.resample_interval = resample_interval
        self.mean_periods = mean_periods
        self.symbol = symbol
        self.units = units

        self.df_prices = pd.DataFrame(columns=[symbol])
        self.pnl, self.upnl = 0, 0
        
        self.bid_price, self.ask_price = 0, 0
        self.position = 0
        self.is_order_pending = False
        self.is_next_signal_cycle = True

    def setup_broker(self, broker):
        #Signal for price, order and position
        broker.on_price_event = self.on_price_event
        broker.on_order_event = self.on_order_event
        broker.on_position_event = self.on_position_event
        return broker

    def on_price_event(self, symbol, bid, ask):
        print(dt.datetime.now(), '[PRICE]', symbol, 'bid:', bid, 'ask:', ask)

        self.bid_price = bid
        self.ask_price = ask
        self.df_prices.loc[pd.Timestamp.now(), symbol] = (bid + ask) / 2.

        self.get_positions()
        self.generate_signals_and_think()

        self.print_state()

    def get_positions(self):
        try:
            self.broker.get_positions()
        except Exception as ex:
            print('get_positions error:', ex)
            
    def on_order_event(self, symbol, quantity, is_buy, transaction_id, status):
        print(
            dt.datetime.now(), '[ORDER]',
            'transaction_id:', transaction_id,
            'status:', status,
            'symbol:', symbol,
            'quantity:', quantity,
            'is_buy:', is_buy,
        )
        if status == 'FILLED': #If the position is filled,
            # There's no other pending order to fill that position
            self.is_order_pending = False
            self.is_next_signal_cycle = False

            self.get_positions()  # Update positions before thinking
            self.generate_signals_and_think()

    def on_position_event(self, symbol, is_long, units, upnl, pnl):
        if symbol == self.symbol:
            self.position = abs(units) * (1 if is_long else -1)
            self.pnl = pnl
            self.upnl = upnl
            self.print_state()

    def print_state(self):
        print(
            dt.datetime.now(), self.symbol, self.position_state,
            abs(self.position), 'upnl:', self.upnl, 'pnl:', self.pnl
        )
        
    @property
    def position_state(self):
        if self.position == 0:
            return 'FLAT'
        if self.position > 0:
            return 'LONG'
        if self.position < 0:
            return 'SHORT'
        
    def generate_signals_and_think(self):
        df_resampled = self.df_prices\
            .resample(self.resample_interval)\
            .ffill()\
            .dropna()
        resampled_len = len(df_resampled.index)

        if resampled_len < self.mean_periods:
            print(
                'Insufficient data size to calculate logic. Need',
                self.mean_periods - resampled_len, 'more.'
            )
            return

        mean = df_resampled.tail(self.mean_periods).mean()[self.symbol]

        # Signal flag calculation
        is_signal_buy = mean > self.ask_price
        is_signal_sell = mean < self.bid_price

        print(
            'is_signal_buy:', is_signal_buy,
            'is_signal_sell:', is_signal_sell,
            'average_price: %.5f' % mean,
            'bid:', self.bid_price,
            'ask:', self.ask_price
        )

        self.think(is_signal_buy, is_signal_sell)
        
    def think(self, is_signal_buy, is_signal_sell):
        if self.is_order_pending:
            #Let the program know that the order is in process
            return
        #The position of the order 
        if self.position == 0:
            self.think_when_position_flat(is_signal_buy, is_signal_sell)
        elif self.position > 0:
            self.think_when_position_long(is_signal_sell)
        elif self.position < 0: 
            self.think_when_position_short(is_signal_buy)        

    def think_when_position_flat(self, is_signal_buy, is_signal_sell):
        #Since the opening price starts at a single value, it essentially starts off flat
        # because nothing has changed yet. 
        if is_signal_buy and self.is_next_signal_cycle:
            print('Opening position, BUY', 
                  self.symbol, self.units, 'units')
            self.is_order_pending = True
            self.send_market_order(self.symbol, self.units, True)
            return

        if is_signal_sell and self.is_next_signal_cycle:
            print('Opening position, SELL', 
                  self.symbol, self.units, 'units')
            self.is_order_pending = True
            self.send_market_order(self.symbol, self.units, False)
            return
        
        #If nothing changes after closing, we have a flat position
        if not is_signal_buy and not is_signal_sell:
            self.is_next_signal_cycle = True
            
    def think_when_position_long(self, is_signal_sell):
        if is_signal_sell:
            print('Closing position, SELL', 
                  self.symbol, self.units, 'units')
            self.is_order_pending = True
            self.send_market_order(self.symbol, self.units, False)

    def think_when_position_short(self, is_signal_buy):
        if is_signal_buy:
            print('Closing position, BUY', 
                  self.symbol, self.units, 'units')
            self.is_order_pending = True
            self.send_market_order(self.symbol, self.units, True)

    def send_market_order(self, symbol, quantity, is_buy):
        self.broker.send_market_order(symbol, quantity, is_buy)

    def run(self):
        self.broker.stream_prices(symbols=[self.symbol])

WARNING! Running the codes below will block on the main thread! You will have to restart the kernel.

In [13]:
trader = MeanReversionTrader(
    broker,
    resample_interval='60s',
    symbol='EUR_USD',
    units=1
)
trader.run()

Price symbol is empty!


Building a trend-following trading platform

In [14]:
class TrendFollowingTrader(MeanReversionTrader):
	def __init__(
		self, *args, long_mean_periods=10,
		buy_threshold=1.0, sell_threshold=1.0, **kwargs
	):
		super(TrendFollowingTrader, self).__init__(*args, **kwargs)

		self.long_mean_periods = long_mean_periods
		self.buy_threshold = buy_threshold
		self.sell_threshold = sell_threshold

	def generate_signals_and_think(self):
		df_resampled = self.df_prices\
			.resample(self.resample_interval)\
			.ffill().dropna()
		resampled_len = len(df_resampled.index)

		if resampled_len < self.long_mean_periods:
			print(
				'Insufficient data size to calculate logic. Need',
				self.mean_periods - resampled_len, 'more.'
			)
			return

		mean_short = df_resampled\
			.tail(self.mean_periods).mean()[self.symbol]
		mean_long = df_resampled\
			.tail(self.long_mean_periods).mean()[self.symbol]
		beta = mean_short / mean_long

		# Signal flag calculation
		is_signal_buy = beta > self.buy_threshold
		is_signal_sell = beta < self.sell_threshold
        
		print(
			'is_signal_buy:', is_signal_buy,
			'is_signal_sell:', is_signal_sell,
			'beta:', beta,
			'bid:', self.bid_price,
			'ask:', self.ask_price
		)

		self.think(is_signal_buy, is_signal_sell)

WARNING! Running the codes below will block on the main thread! You will have to restart the kernel.

In [15]:
trader = TrendFollowingTrader(
    broker,
    resample_interval='60s',
    symbol='EUR_USD',
    units=1,
    mean_periods=5,
    long_mean_periods=10,
    buy_threshold=1.000010,
    sell_threshold=0.99990,
)
trader.run() 

Price symbol is empty!


VaR for risk management

In [16]:
"""
Download the all-time AAPL dataset
"""
from alpha_vantage.timeseries import TimeSeries #module used to get stock data from the Alpha Vantage API
 
# Update your Alpha Vantage API key here...
ALPHA_VANTAGE_API_KEY = 'V1V21XSV6X0FOOD4'
 
ts = TimeSeries(key=ALPHA_VANTAGE_API_KEY, output_format='pandas')
df, meta_data = ts.get_daily_adjusted(symbol='AAPL', outputsize='full')

In [17]:
df.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 5448 entries, 2021-06-25 to 1999-11-01
Data columns (total 8 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   1. open               5448 non-null   float64
 1   2. high               5448 non-null   float64
 2   3. low                5448 non-null   float64
 3   4. close              5448 non-null   float64
 4   5. adjusted close     5448 non-null   float64
 5   6. volume             5448 non-null   float64
 6   7. dividend amount    5448 non-null   float64
 7   8. split coefficient  5448 non-null   float64
dtypes: float64(8)
memory usage: 383.1 KB


In [18]:
import datetime as dt # Code that displays the date and time
import pandas as pd # Used for high-level indexing
 
# Define the date range
start = dt.datetime(2017, 1, 1)
end = dt.datetime(2017, 12, 31)
 
# Cast indexes as DateTimeIndex objects
df.index = pd.to_datetime(df.index)
closing_prices = df['5. adjusted close']
prices = closing_prices.loc[start:end]

In [19]:
from scipy.stats import norm # open-source software for science, math and engineering

def calculate_daily_var(
    portfolio, prob, mean, 
    stdev, days_per_year=252.
):
    alpha = 1-prob
    u = mean/days_per_year
    sigma = stdev/np.sqrt(days_per_year)
    norminv = norm.ppf(alpha, u, sigma)
    return portfolio - portfolio*(norminv+1)

In [20]:
import numpy as np # Used for working with arrays

portfolio = 100000000.00
confidence = 0.95 # Ah, confidence intervals. This is used to check for potential anomalies
 
# Calculate the mean and standard deviation of the daily returns, and percent change
daily_returns = prices.pct_change().dropna()
mu = np.mean(daily_returns)
sigma = np.std(daily_returns)

In [21]:
VaR = calculate_daily_var(
    portfolio, confidence, mu, sigma, days_per_year=252.)
print('Value-at-Risk: %.2f' % VaR)

Value-at-Risk: nan
