# Ticker client

A notebook demonstrating asynchronous algorithms to maintain a "local" copy of the ticker data from the server.

We detail algorithms for two different levels of hypothetical REST API support:
- the exchange supports information on all tickers being downloaded in one request.
- the exchange supports only a single ticker's information being downloaded in one request.

Due to the ubiquity of REST API rate limitations, in the latter case it is usually infeasible to develop real-time information on all markets at once using the REST API alone.

In both cases, the REST and websocket APIs can be combined to maintain real-time information on all markets.

In [1]:
import aiohttp
import ast
import asyncio
import coinblockpro
import copy
import nest_asyncio
import time
import websockets

nest_asyncio.apply()  # lets asyncio be run inside Tornado's event loop

### Full-ticker

The full-ticker algorithm first establishes the websocket client, and builds a buffer of the markets it has acuired information for, keeping only the latest.

After the websocket connection is established, the REST API is contacted a single time to retrieve the ticker information. The buffer of websocket data is used to update the ticker information from the REST API.

From this point on, so long as the websocket is maintained, the market information will be up-to-date.

Here, the ticker information is being written to memory only and therefore without an asyncio primitive locking the ticker_dict class property, no race conditions would normally apply. We, however, include an explicit lock to demonstrate where one should go in the event the data is being exported to a shared resource that does not handle race conditions well.

In [2]:
class FullTicker:
    """Class to monitor an exchange's markets where the exchange
    offers the ability to download info on all tickers at once over
    the REST API.
    """
    
    def __init__(self):
        self.ticker_data = {}  # market: price
        self.ticker_data_lock = None
        self.rest_data = None
        self.rest_event = None

    async def get_full_ticker(self):
        """Coroutine to fetch the full ticker over the REST API.
        Only run after the event indicating websocket connection.
        """       
        await self.rest_event.wait()
        url = 'http://0.0.0.0:8000/full_ticker'
        async with self.ticker_data_lock:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    payload = await response.json()
                    if not response.status == 200:
                        raise ValueError('Request failed!')
                    if not 'result' in payload:
                        raise ValueError('Request error.')
                    rest_data = payload['result']
                    self.insert_rest_values(rest_data)
        print('We now have information on all markets!')
            
    def insert_rest_values(self, data):
        """Blocking function to insert the rest API data into the 
        ticker_data container.
        """
        print('Assigning full ticker REST data')
        temp_dict = copy.deepcopy(self.ticker_data)
        self.ticker_data = data
        for k, v in temp_dict.items():
            self.ticker_data[k] = v
            
    async def subscribe(self, period):
        """Coroutine to subscribe to the websocket ticker feed for 
        the given period (in seconds).
        """
        print('Establishing websocket feed')
        time_now = time.time()
        # Lock the REST API from fetching the ticker data until the
        # websocket connection is receiving data.
        async with websockets.connect('ws://0.0.0.0:8001/ticker_feed') as ws:
            self.rest_event.set()
            while time.time() < time_now + period:
                async with self.ticker_data_lock:
                    resp = await ws.recv()                
                    my_dict = ast.literal_eval(resp)
                    for k, v in my_dict.items():
                        print(f'Websocket: market {k} to price {v}')
                        self.ticker_data[k] = v
            print('Terminating websocket connection.')
            
    async def monitor_markets(self, period):
        """Monitor markets using the REST and websocket APIs."""
        self.rest_event = asyncio.Event()
        self.ticker_data_lock = asyncio.Lock()
        await asyncio.gather(
            self.get_full_ticker(),
            self.subscribe(period)
        )

Now we will run this algorithm for 10 seconds.

In [3]:
f = FullTicker()
asyncio.run(f.monitor_markets(10))

Establishing websocket feed
Websocket: market etc_dot to price 7533.094
Assigning full ticker REST data
We now have information on all markets!
Websocket: market ltc_eur to price 6806.274
Websocket: market ltc_xrp to price 1929.749
Websocket: market etc_xrp to price 7381.192
Websocket: market eur_usd to price 471.197
Websocket: market eth_dot to price 3338.11
Websocket: market etc_usd to price 8381.453
Websocket: market ltc_dot to price 6875.412
Websocket: market xlm_usd to price 8549.784
Websocket: market btc_usd to price 7947.607
Websocket: market etc_xlm to price 8902.866
Websocket: market ltc_xlm to price 3145.784
Websocket: market ltc_bch to price 9570.648
Websocket: market eth_xrp to price 7451.928
Websocket: market btc_eth to price 7368.568
Websocket: market etc_usd to price 8126.294
Websocket: market etc_dot to price 7707.904
Websocket: market ltc_eth to price 4826.727
Terminating websocket connection.


### Single ticker

Again, we implement the websocket connection first, using it to begin populating a list of markets.

To gather information on markets seeing little or no activity, and therefore not having their information broadcast on the websocket connections, we run an algorithm in tandem with the websocket connection using the REST API.

Periodically, with some delay to avoid request rate limitations (0.5s, but would likely have to be a longer duration in a real-world scenario), we request information on a single market for which we have no available information.

While this happens, the websocket information is buffered, such that if information comes in after the request is made, which could be more up-to-date than the information in the request response, it will overwrite the request information. This runs until we have information on all markets, at which point the websocket client alone is sufficient to provide real-time information.

In [4]:
class SingleTicker:
    """Class to monitor an exchange's markets where the exchange
    offers the ability to download info on only one ticker at once over
    the REST API.
    """
    
    def __init__(self):
        self.ticker_data = {}  # market: price
        self.ticker_data_lock = None
        self.rest_data = None
        self.rest_event = None
        
    async def get_single_ticker(self, market):
        """Coroutine to fetch the ticker for the given market over the REST API.
        Can only be run after the event indicating websocket connection.
        """       
        url = 'http://0.0.0.0:8000/single_ticker'
        
        async with aiohttp.ClientSession() as session:
            async with session.get(url, params={'market': market}) as response:
                payload = await response.json()
                if not response.status == 200:
                    raise ValueError('Request failed!')
                if not 'result' in payload:
                    raise ValueError('Request error.')
                rest_data = payload['result']
                for k, v in rest_data.items():
                    print(f'REST: market {k} to price {v}')
                self.insert_rest_values(rest_data)
                
    def insert_rest_values(self, data):
        """Blocking function to insert the rest API data into the 
        ticker_data container.
        """
        temp_dict = copy.deepcopy(self.ticker_data)
        self.ticker_data = data
        self.ticker_data.update(temp_dict)
        
    async def subscribe(self, period):
        """Coroutine to subscribe to the websocket ticker feed for 
        the given period (in seconds).
        """
        print('Establishing websocket connection.')
        time_now = time.time()
        # Lock the REST API from fetching the ticker data until the
        # websocket connection is receiving data.
        async with websockets.connect('ws://0.0.0.0:8001/ticker_feed') as ws:
            self.rest_event.set()
            while time.time() < time_now + period:
                async with self.ticker_data_lock:
                    resp = await ws.recv()                
                    my_dict = ast.literal_eval(resp)
                    for k, v in my_dict.items():
                        print(f'Websocket: market {k} to price {v}')
                        self.ticker_data[k] = v
            print('Terminating websocket connection.')
    
    async def get_missing_markets(self):
        """Coroutine to get any missing market data."""
        await self.rest_event.wait()

        market_list = coinblockpro.markets
        markets_to_get = [m for m in market_list if 
                          m not in self.ticker_data.keys()]
        counter = 0
        while markets_to_get:
            # Lock the ticker_data until the request is dealt with,
            # to prevent the rest API overwriting a more up-to-date
            # price from the websocket.
            async with self.ticker_data_lock:
                # Fetch a market we don't have.
                market = markets_to_get[0]
                await self.get_single_ticker(market)
                counter += 1
            # Sleep to avoid rate limits.
            await asyncio.sleep(0.5)
            markets_to_get = [m for m in market_list if 
                              m not in self.ticker_data.keys()]

        print('All markets now have current information!')
        total = len(market_list)
        print(f'Downloaded information on {counter} / {total}'
              ' markets through the REST API.')
        
    async def monitor_markets(self, period):
        """Monitor markets using the REST and websocket APIs."""
        self.rest_event = asyncio.Event()
        self.ticker_data_lock = asyncio.Lock()
        await asyncio.gather(
            self.get_missing_markets(),
            self.subscribe(period)
        )

We will now run this algorithm for 50 seconds.

In [5]:
s = SingleTicker()
asyncio.run(s.monitor_markets(50))

Establishing websocket connection.
Websocket: market eur_usd to price 474.683
REST: market btc_ltc to price 1354.712
Websocket: market ltc_bch to price 9422.766
REST: market btc_eth to price 7368.568
Websocket: market dot_usd to price 4303.309
REST: market btc_etc to price 9694.483
Websocket: market bch_eur to price 854.566
REST: market btc_xlm to price 7712.736
Websocket: market dot_eur to price 5672.834
REST: market btc_xrp to price 61.412
Websocket: market eth_eur to price 3007.273
Websocket: market xlm_xrp to price 6493.124
REST: market btc_bch to price 5790.396
Websocket: market xlm_eur to price 5028.015
Websocket: market bch_dot to price 381.519
REST: market btc_dot to price 4579.043
Websocket: market btc_etc to price 9773.172
REST: market btc_eur to price 1389.045
Websocket: market btc_xlm to price 8073.707
REST: market btc_usd to price 7947.607
Websocket: market ltc_etc to price 4199.112
REST: market ltc_eth to price 4826.727
Websocket: market btc_usd to price 8049.302
REST: ma