In [16]:
import pandas as pd
import numpy as np
import requests
import os
import re
import tarfile
import zipfile
import bz2
import glob
import logging
import yaml

from datetime import date, timedelta
from unittest.mock import patch
from typing import List, Set, Dict, Tuple, Optional
from itertools import zip_longest
import betfairlightweight
from betfairlightweight import StreamListener
from betfairlightweight.resources.bettingresources import (
    PriceSize,
    MarketBook
)

# Utility Functions
# _________________________________

def as_str(v) -> str:
    return '%.2f' % v if type(v) is float else v if type(v) is str else ''

def split_anz_horse_market_name(market_name: str) -> (str, str, str):
    parts = market_name.split(' ')
    race_no = parts[0] # return example R6
    race_len = parts[1] # return example 1400m
    race_type = parts[2].lower() # return example grp1, trot, pace
    return (race_no, race_len, race_type)

def filter_market(market: MarketBook) -> bool: 
    d = market.market_definition
    return (d.country_code == 'AU' 
        and d.market_type == 'WIN' 
        and (c := split_anz_horse_market_name(d.name)[2]) != 'trot' and c != 'pace')

def load_markets(file_paths):
    for file_path in file_paths:
        print(file_path)
        if os.path.isdir(file_path):
            for path in glob.iglob(file_path + '**/**/*.bz2', recursive=True):
                f = bz2.BZ2File(path, 'rb')
                yield f
                f.close()
        elif os.path.isfile(file_path):
            ext = os.path.splitext(file_path)[1]
            # iterate through a tar archive
            if ext == '.tar':
                with tarfile.TarFile(file_path) as archive:
                    for file in archive:
                        yield bz2.open(archive.extractfile(file))
            # or a zip archive
            elif ext == '.zip':
                with zipfile.ZipFile(file_path) as archive:
                    for file in archive.namelist():
                        yield bz2.open(archive.open(file))

    return None

def slicePrice(l, n):
    try:
        x = l[n].price
    except:
        x = np.nan
    return(x)

def sliceSize(l, n):
    try:
        x = l[n].size
    except:
        x = np.nan
    return(x)

def wapPrice(l, n):
    try:
        x = round(sum( [rung.price * rung.size for rung in l[0:(n-1)] ] ) / sum( [rung.size for rung in l[0:(n-1)] ]),2)
    except:
        x = np.nan
    return(x)

def ladder_traded_volume(ladder):
    return(sum([rung.size for rung in ladder]))


In [17]:
with open("../../../secrets.yaml", 'r') as stream:
    creds = yaml.safe_load(stream)

trading = betfairlightweight.APIClient(creds['uid'], creds['pwd'],  app_key=creds["api_key"])

listener = StreamListener(max_latency=None)

In [19]:
def extract_components_from_stream(s):
    
    with patch("builtins.open", lambda f, _: f):   
    
        # Will return 3 market books t-3mins marketbook, the last preplay marketbook and the final market book
        evaluate_market = None
        prev_market = None
        postplay = None
        preplay = None
        t5m = None
        t30s = None
        inplay_min_lay = None

        gen = s.get_generator()

        for market_books in gen():
            
            for market_book in market_books:

                # If markets don't meet filter return None's
                if evaluate_market is None and ((evaluate_market := filter_market(market_book)) == False):
                    return (None, None, None, None, None, None)

                # final market view before market goes in play
                if prev_market is not None and prev_market.inplay != market_book.inplay:
                    preplay = market_book

                # final market view before market goes is closed for settlement
                if prev_market is not None and prev_market.status == "OPEN" and market_book.status != prev_market.status:
                    postplay = market_book

                # Calculate Seconds Till Scheduled Market Start Time
                seconds_to_start = (market_book.market_definition.market_time - market_book.publish_time).total_seconds()
                    
                # Market at 30 seconds before scheduled off
                if t30s is None and seconds_to_start < 30:
                    t30s = market_book

                # Market at 30 seconds before scheduled off
                if t5m is None and seconds_to_start < 5*60:
                    t5m = market_book

                # Manage Inplay Vectors
                if market_book.inplay:

                    if inplay_min_lay is None:
                        inplay_min_lay = [ slicePrice(runner.ex.available_to_lay,0) for runner in market_book.runners]
                    else:
                        inplay_min_lay = np.fmin(inplay_min_lay, [ slicePrice(runner.ex.available_to_lay,0) for runner in market_book.runners])

                # update reference to previous market
                prev_market = market_book

        # If market didn't go inplay
        if postplay is not None and preplay is None:
            preplay = postplay
            inplay_min_lay = ["" for runner in market_book.runners]

        return (t5m, t30s, preplay, postplay, inplay_min_lay, prev_market) # Final market is last prev_market


def parse_stream(dir):
    
    # with open("outputs/sample.csv", "w+") as output:

    #     output.write("market_id,selection_id,time,market_status,inplay_status,traded_volume,ltp,best_back,best_lay,best_back_volume,best_lay_volume\n")

    for file_obj in load_markets(dir):

        stream = trading.streaming.create_historical_generator_stream(
            file_path=file_obj,
            listener=listener,
        )
        
        (t5m, t30s, preplay, postplay, inplayMin, final) = extract_components_from_stream(stream)

        # If no price data for market don't write to file
        if postplay is None:
            continue; 

        runnerMeta = [
            {
                'selection_id': r.selection_id,
                'selection_name': next((rd.name for rd in final.market_definition.runners if rd.selection_id == r.selection_id), None),
                'selection_status': r.status,
                'sp': r.sp.actual_sp
            }
            for r in final.runners 
        ]

        ltp = [runner.last_price_traded for runner in preplay.runners]

        tradedVol = [ ladder_traded_volume(runner.ex.traded_volume) for runner in postplay.runners ]

        wapBack30s = [ wapPrice(runner.ex.available_to_back, 3) for runner in t30s.runners]

        wapLay30s = [ wapPrice(runner.ex.available_to_lay, 3) for runner in t30s.runners]

        # Writing To CSV
        # ______________________

        for (runnerMeta, ltp, tradedVol, inplayMin, wapBack30s, wapLay30s) in zip(runnerMeta, ltp, tradedVol, inplayMin, wapBack30s, wapLay30s):

            if runnerMeta['selection_status'] != 'REMOVED':

                print(
                    "{},{},{},{},{},{},{},{},{}\n".format(
                        str(final.market_id),
                        runnerMeta['selection_id'],
                        runnerMeta['selection_name'],
                        wapBack30s,
                        wapLay30s,
                        runnerMeta['sp'],
                        ltp,
                        round(tradedVol),
                        inplayMin
                    )
                )


parse_stream(["/media/hdd/data/betfair-stream/thoroughbred/2021_06_JunRacingAUPro.tar"])

/media/hdd/data/betfair-stream/thoroughbred/2021_06_JunRacingAUPro.tar
1.183995724,22832649,1. Ablestock,8.28,8.66,8.4,8.4,4071,6.6

1.183995724,13688381,2. Annunciate,2.07,2.12,2.3,2.32,30106,1.01

1.183995724,2659734,3. Eyes Are Blue,221.92,300.7,382.87,320.0,117,680.0

1.183995724,11781811,4. High Rolla,186.27,235.36,171.86,170.0,236,100.0

1.183995724,18740780,5. Kapuziner,61.21,71.95,70.0,70.0,426,19.0

1.183995724,39611435,6. Mick N Me,90.87,115.58,85.24,85.0,433,120.0

1.183995724,15537907,7. Noumea,62.4,77.61,75.74,80.0,398,9.0

1.183995724,19741292,8. Thunder Cloud,4.24,4.46,4.4,4.5,11322,3.0

1.183995724,33957114,10. Devon Miss,65.85,77.38,60.0,55.0,564,8.0

1.183995724,11192947,11. Fairlight,8.03,8.52,7.72,7.8,4176,7.4

1.183995726,12662979,1. Aloft,20.12,22.75,24.0,24.0,1833,24.0

1.183995726,5450942,2. Buffalo Bill,34.75,39.65,41.1,42.0,967,29.0

1.183995726,19674605,3. By Design,11.63,12.74,12.26,12.0,3807,1.01

1.183995726,27140232,4. Dourekn,277.38,484.71,345.16,200.0,9

KeyboardInterrupt: 

In [2]:
import glob
stream_files = glob.glob("/media/hdd/data/betfair-stream/thoroughbred/*.tar")
stream_files

['/media/hdd/data/betfair-stream/thoroughbred/2021_03_MarRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2020_11_NovRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2021_05_MayAURacingPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2020_09_SepRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2021_01_JanRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2021_04_AprRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2021_02_FebRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2020_10_OctRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2021_06_JunRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2020_12_DecRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2020_07_JulRacingAUPro.tar',
 '/media/hdd/data/betfair-stream/thoroughbred/2020_08_AugRacingAUPro.tar']