TODO:

- Notional amount:  What to do if this is 0?
  - Skip security as it has likely already redeemed? or
  - Ignore nominal amount?

In [1]:
# Imports 
import logging
import re
from datetime import datetime, timedelta
from zipfile import ZipFile
from os import mkdir, listdir
from os.path import join, exists, dirname, realpath
from typing import List, Set, Tuple, Dict, Collection, Any, Callable, Union, NewType, Iterable, Optional
from io import BytesIO
from threading import Thread
from multiprocessing import Pool, Manager

from dateutil import parser as dateparser
from dateutil.tz import tzutc

from lxml import etree
import requests

In [2]:
# Constants

DATA_DIR = 'data_files'

today_utc = datetime.utcnow().replace(tzinfo=tzutc())
today = datetime.today()

In [3]:
"""Functions for fetching FIRDS files"""

    
Q_URL = ('https://registers.esma.europa.eu/solr/esma_registers_firds_files/'
        'select?q=*&fq=publication_date:%5B{from_year}-{from_month}-'
        '{from_day}T00:00:00Z+TO+{to_year}-{to_month}-{to_day}T23:59:59Z%5D'
        '&wt=xml&indent=true&start=0&rows=100')

FNAME_START = 'FULINS_{}'
    
def get_file_urls(from_date: datetime = None, to_date: datetime = None, ftype: str = '') -> List[str]:
    if from_date is None:
        to_date = datetime.today()
        from_date = to_date - timedelta(weeks=1)
    elif to_date is None:
        to_date = from_date
        
    url = Q_URL.format(
        from_year=from_date.year,
        from_month=from_date.month,
        from_day=from_date.day,
        to_year=to_date.year,
        to_month=to_date.month,
        to_day=to_date.day
    )
    response = requests.get(url)
    response.raise_for_status()
    #print(response.content)
    root = etree.fromstring(response.content)
    #print(list(root[1]))
    urls = []
    for entry in root[1]:
        if entry[6].text.startswith(FNAME_START.format(ftype)): # File name
            urls.append(entry[1].text) # URL
    return urls
    
def download_zipped_file(url: str, to_dir: str = None) -> str:
    if to_dir is None:
        to_dir = DATA_DIR
    response = requests.get(url)
    response.raise_for_status()
    zipfile = ZipFile(BytesIO(response.content))
    name = zipfile.namelist()[0]
    zipfile.extractall(path=to_dir)
    return join(to_dir, name)
    
def download_xml_files(from_date: datetime = None, to_date: datetime = None, to_dir: str = None, ftype: str = '') -> List[str]:
    fpaths = []
    for fpath in get_file_urls(from_date, to_date, ftype=ftype):
        fpaths.append(download_zipped_file(fpath, to_dir))
    return fpaths
    
def get_xml_files(data_dir: str = None, force_dl: bool = False, ftype: str = '') -> List[str]:
    logging.info('Getting FIRDS XML files.')
    if data_dir is None:
        data_dir = DATA_DIR
    xml_files = [join(data_dir, f) for f in listdir(data_dir) if (f.startswith(FNAME_START.format(ftype)) and f.endswith('.xml'))]
    if (not xml_files) or force_dl:
        for f in xml_files:
            remove(f)
        return download_xml_files(to_dir=data_dir, ftype=ftype)
    else:
        return xml_files
    

    

In [4]:
debt_files = get_xml_files(ftype='D')

In [5]:
# See:
# https://www.esma.europa.eu/sites/default/files/library/esma65-11-1193_firds_reference_data_reporting_instructions_v2.1.pdf
#
# XML structure (ignoring irrelevant nodes):
# - BizData (root)
#   - Hdr
#   - Pyld (root[1])
#     - Document (Pyld[0])
#       - FinInstrmRptgRefDataRpt (Document[0])
#         - RptHdr
#         - RefData (FinInstrmRptgRefDataRpt[1:]) (repeats for each ISIN)
#           - FinInstrmGnlAttrbts (RefData[0])
#             - Id (FinInstrmGnlAttrbts[0]): text = ISIN
#             - NtnlCcy (FinInstrmGnlAttrbts[4]): text = notional currency
#           - Issr (RefData[1]): text = issuer LEI
#           - TradgVnRltdAttrbts (RefData[2])
#             - Id (TradgVnRltdAttrbts[0]): text = trading venue MIC
#           - DebtInstrmAttrbts (RefData[3])
#             - TtlIssdNmnlAmt (DebtInstrmAttrbts[0]): text = total issued nominal amount
#             - MtrtyDt (DebtInstrmAttrbts[1]): text = maturity date in '%Y-%m-%d' format
#             - NmnlValPerUnit (DebtInstrmAttrbts[2]): text = minimum denomination
#             - IntrstRate (DebtInstrmAttrbts[3]):
#               - Fxd: text = interest rate (or None)
#               OR
#               - Fltg
#                 - RefRate (Fltg[0])
#                   - Nm (RefRate[0]): text = reference rate name
#                   OR
#                   - ISIN (RefRate[0]): text = ISIN
#                   OR
#                   - Indx (RefRate[0]): text = index (?)
#                   
#                 - Term (Fltg[1])
#                   - Unit (Term[0]): text = unit of time (eg, 'MNTH')
#                   - Val (Term[1]): text = term as number of units
#                 - BsisPtSprd (Fltg[2]): text = basis point spread
#           - TechAttrbts (RefData[4])
#             - RlvntCmptntAuthrty (TechAttrbts[0]): text = country of RCA
#
# See also:
# https://www.esma.europa.eu/sites/default/files/library/esma70-1861941480-56_qas_mifir_data_reporting.pdf

day = timedelta(days=1)

def find_by_isin(isin: str, elems: Iterable[etree._Element]) -> etree._Element:
    for e in elems:
        if get_isin(e) == isin:
            return e

def get_isin(elem: etree._Element) -> str:
    return elem[0][0].text
      
def get_maturity_date(elem:etree._Element) -> datetime:
    return datetime.strptime(elem[3][1].text, '%Y-%m-%d')
    
def get_maturity(elem: etree._Element, from_date: datetime = datetime.today()) -> float:
    maturity = ((get_maturity_date(s) - from_date) / day) / 354.25
    return maturity

def get_tv_dates(elem: etree._Element) -> Tuple[datetime, datetime]:
    tv_data = elem[2]
    first_trade = None
    termination = None
    for datum in tv_data:
        if datum.tag.endswith('}FrstTradDt'):
            first_trade = dateparser.isoparse(datum.text)
        elif datum.tag.endswith('}TermntnDt'):
            termination = dateparser.isoparse(datum.text)
    return first_trade, termination

def get_notional_amount(elem: etree._Element) -> float:
    return float(elem[3][0].text)

def get_currency(elem: etree._Element, convert_DEM: bool = True) -> str:
    currency = elem[0][4].text
    if (currency == 'DEM') and convert_DEM:
        return 'EUR'
    else:
        return currency

def get_interest_rate(elem: etree._Element) -> dict:
    ir_elem = elem[3][3][0]
    ir_data = {}
    if ir_elem.tag.endswith('}Fxd'):
        ir_data['fixed_floating'] = 'fixed'
        ir_data['rate'] = ir_elem.text
    elif ir_elem.tag.endswith('}Fltg'):
        ir_data['fixed_floating'] = 'floating'
        ref_rate = ir_elem[0]
        if ref_rate[0].tag.endswith('}Nm'):
            ir_data['index_name'] = ref_rate[0].text
        elif ref_rate[0].tag.endswith('}ISIN'):
            ir_data['index_isin'] = ref_rate[0].text
        elif ref_rate[0].tag.endswith('}Indx'):
            ir_data['index_code'] = ref_rate[0].text
        ir_data['term'] = (ir_elem[1][0].text, float(ir_elem[1][1].text))
        ir_data['spread'] = float(ir_elem[2].text)
    else:
        raise ValueError('Found unexpected interest rate: {}.'.format(ir_elem.tag))
    return ir_data

def is_floating(elem: etree._Element) -> bool:
    return get_interest_rate(elem)['fixed_floating'] == 'floating'


def print_details(elem: etree._Element, float_only: bool = False):
    if not float_only:
        if get_interest_rate(elem)['fixed_floating'] != 'floating':
            return
    print('ISIN: {}'.format(get_isin(elem)))
    print('Notional amount: {} {}'.format(get_currency(elem), get_notional_amount(elem)))
    print('Maturity: {} years'.format(get_maturity(elem)))
    print('Interest rate: {}'.format(get_interest_rate(elem)))


In [6]:
# TODO: Add ISINs from
# https://www.eurexrepo.com/resource/blob/309066/a9347c3456795e9f4c82730c04567f39/data/product_specs_seclend_de_20171211.pdf

libors = (
    
    {
        'root_names': (
            ('GBP', 'LIBOR'),
            ('STERLING', 'LIBOR')
        ),
        'isins': {
            'GB00BD080045',
            'GB00BD07ZZ10',
            'GB0003117685',
            'GB0009655183'
        },
        'code': 'LIBO',
        'names': {
            'GBP LIBOR',
            'BP0003M'
        },
        'currency': 'GBP'
    },
    
    {
        'root_names': (
            ('USD', 'LIBOR'),
        ),
        'isins': {
            'GB00BD080714',
            'GB0003758389',
            'GB00BD080607',
            'GB00BD080821',
            'GB00BD080938',
            'GB0003766598',
            'GB0003764668',
        },
        'code': 'LIBO',
        'names': {
            'USD LIBOR',
            'US0003M',
            'OFFERED USD RATE',
            '1MTH USD',
            '30DAY USD',
            '3MTH USD',
            '6MTH USD',
            '180DAY USD',
            '90DAY USD'
        },
        'currency': 'USD'
    },
    
    {
        'root_names': (
            ('CHF', 'LIBOR'),
            ('SWISS', 'FRANC', 'LIBOR')
        ),
        'isins': {
            'GB00BD080F90',
            'GB00BD080C69',
        },
        'code': 'LIBO',
        'names': {
            'CHF LIBOR'
        },
        'currency': 'CHF'
    },
    
    {
        'root_names': (
            ('EUR', 'LIBOR'),
            ('EURO', 'LIBOR')
        ),
        'isins': {
            'GB00BD080482'
        },
        'code': 'LIBO',
        'names': {
            'EUR LIBOR'
        },
        'currency': 'EUR'
    },
    
    {
        'root_names': (
            ('JPY', 'LIBOR'),
            ('YEN', 'LIBOR')
        ),
        'isins': set(),
        'code': 'LIBO',
        'names': {
            'JPY LIBOR'
        },
        'currency': 'JPY'
        
    },

    {
        'root_names': (
            ('LIBOR',),
        ),
        'isins': set(),
        'code': 'LIBO',
        'names': {
            'LIBOR'
        },
        'currency': None
    }

)

non_libors = {
    
    'EURIBOR': {
        'root_names': (
            ('EURIBOR',),
            ('EUR', 'INTERBANK', 'OFFERED')
        ),
        'isins': {
            'EU0009652783',
            'EU0009659937',
            'EU0009652791',
            'EU000A0X7136'
        },
        'code': 'EURI',
        'names': {
            'EURIBOR',
            'EURI',
            'EUROBOR',
            '1YR EUR',
            '180Day EUR'
        },
        'currency': 'EUR'
    },

    'EONIA': {
        'root_names': (
            ('EONIA',),
            ('EURO', 'OVERNIGHT', 'INDEX', 'AVERAGE')
        ),
        'isins': {
            'EU0009659945'
        },
        'code': 'EONA',
        'names': {
            'EONIA',
            'EURO OVERNIGHT INDEX AVERAGE'
        },
        'currency': 'EUR'
    },

    'SONIA': {
        'root_names': (
            ('SONIA',),
            ('STERLING', 'OVERNIGHT', 'INDEX', 'AVERAGE')
        ),
        'isins': {
            'GB00B56Z6W79'
        },
        'code': False,
        'names': {
            'SONIA',
            'STERLING OVERNIGHT INTERB'
        },
        'currency': 'GBP'
    },
    
    'TIBOR': {
        'root_names': (
            ('TIBOR',),
            ('TOKYO', 'INTERBANK', 'OFFERED'),
        ),
        'isins': set(),
        'code': 'TIBO',
        'names': {
            'TIBOR'
        },
        'currency': 'JPY'
    },
    
    'TONAR': {
        'root_names': (
            ('TONAR',),
            ('TOKYO', 'OVERNIGHT', 'AVERAGE')
        ),
        'isins': set(),
        'code': False,
        'names': {
            'TONAR'
        },
        'currency': 'JPY'
    },

    'ESTR': {
        'root_names': (
            ('ESTR',),
            ('EURO', 'SHORT', 'TERM', 'RATE'),
            ('EURO', 'SHORT-TERM', 'RATE')
        ),
        'isins': {
            'EU000A2X2A25'
        },
        'code': 'ESTR',
        'names': {
            '€STR',
        },
        'currency': 'GBP'
    },

    'SOFR': {
        'root_names': (
            ('SOFR',),
            ('SECURED', 'OVERNIGHT', 'FINANCING', 'RATE')
        ),
        'isins': set(),
        'code': 'SOFR',
        'names': {
            'SOFR',
            'SECURED OVERNIGHT FINANCING RATE'
        },
        'currency': 'USD'
    },

    'SARON': {
        'root_names': (
            ('SARON',),
            ('SWISS AVERAGE RATE OVERNIGHT')
        ),
        'isins': {
            'CH0049613687'
        },
        'code': False,
        'names': {
            'SARON',
            'SWISS AVERAGE RATE OVERNIGHT'
        },
        'currency': 'CHF'
    },
}

bm_names = {' '.join((bm['currency'], 'LIBOR')) for bm in libors if bm['currency'] is not None} | non_libors.keys()

replacements = {
    'GBP LIBOR': 'SONIA',
    'USD LIBOR': 'SOFR',
    'CHF LIBOR': 'SARON',
    'EUR LIBOR': 'ESTR',
    'EURIBOR': 'ESTR',
    'EONIA': 'ESTR'
}


In [7]:
currency_mismatch = {}
def is_benchmark(bm_data: dict, ir_data: dict, check_code: bool = True) -> Tuple[bool, Optional[str]]:
    name = ir_data.get('index_name')
    code = ir_data.get('index_code')
    isin = ir_data.get('index_isin')
    
    if check_code and (code == bm_data['code']):
            return True, 'code'
    if isin in bm_data['isins']:
        return True, 'isin'
    if not name:
        return False, None
    name = name.upper()
    if (name in bm_data['names'] | bm_data['isins']) or (name == bm_data['code']):
        # Check if index_name is one of the benchmark's recognised names, or is one of the benchmark's
        # recognised ISINs or is the benchmark's code (the latter two happen sometimes)
        return True, 'name'
    if any(all(word.upper() in re.split('[ \-+]', name) for word in root_name) for root_name in bm_data['root_names']):
        bm_data['names'].add(name)
        return True, 'root_name'
    return False, None

def is_libor(ir_data: dict, currency: str) -> Tuple[Union[str, bool], Optional[str]]:
    for bm_data in libors:
        if bm_data['currency'] is None:
            check_code = True
        else:
            check_code = False
        is_match, match_type = is_benchmark(bm_data, ir_data, check_code)
        if is_match:
            bm_currency = bm_data['currency']
            if bm_currency is None:
                # Security has matched generic_libor, so we just guess LIBOR currency
                # from currency of security.
                return currency, match_type
            else:
                return bm_currency, match_type
    return False, None

def get_benchmark(ir_data: dict, currency: str, isin: str = None) -> Tuple[Optional[str], Optional[str]]:
    
    benchmark = None
    
    libor_currency, match_type = is_libor(ir_data, currency)
    if libor_currency:
        benchmark = ' '.join((libor_currency, 'LIBOR'))
        bm_currency = libor_currency
    else:
        for bm in non_libors:
            is_match, match_type = is_benchmark(non_libors[bm], ir_data, currency)
            if is_match:
                benchmark = bm
                bm_currency = non_libors[bm]['currency']
                break
    
    if benchmark is not None:
        if (isin is not None) and (bm_currency != currency):
            # Security has matched a specific currency LIBOR, but that does not match the
            # security's own currency (possibly indicates that one of them is wrong)
            currency_mismatch[isin] = (bm_currency, currency)
        return benchmark, match_type
    else:
        return None, None
    
    

In [8]:
def init_tracker():
    return {
        'last_isin': None,
        'floating': 0,
        'fixed': 0,
        'floating_uncat': {
            'index_name': {},
            'index_code': {},
            'index_isin': {}
        },
        'bm_counts': {bm: 0 for bm in bm_names},
        'duplicates': 0,
        'matured': 0,
        'delisted': 0,
        'zero_notional': 0
    }

def parse_security(s, tracker):
    _, term_date = get_tv_dates(s)
    if (term_date is not None) and (term_date < today_utc):
        tracker['delisted'] += 1
        return
    isin = get_isin(s)
    if isin == tracker['last_isin']:
        tracker['duplicates'] += 1
        return
    tracker['last_isin'] = isin
    if get_maturity_date(s) < today:
        tracker['matured'] += 1
        return
    if get_notional_amount(s) == 0.0:
        tracker['zero_notional'] += 1
        return
    ir_data = get_interest_rate(s)
    currency = get_currency(s)
    if ir_data['fixed_floating'] == 'floating':
        tracker['floating'] += 1
        bm, match_type = get_benchmark(ir_data, currency)
        if bm:
            tracker['bm_counts'][bm] += 1
        else:
            for identifier in ('index_isin', 'index_name', 'index_code'):
                if identifier in ir_data:
                    tracker['floating_uncat'][identifier][ir_data[identifier]] = tracker['floating_uncat'][identifier].get(ir_data[identifier], 0) + 1
    else:
        tracker['fixed'] += 1

In [9]:
def parse_file(fpath, tracker):
    for event, elem in etree.iterparse(fpath):
        if elem.tag.endswith('}RefData'):
            parse_security(elem, tracker)
            elem.clear()

In [17]:
%%time

tracker = init_tracker()

for fpath in debt_files:
    for event, elem in etree.iterparse(fpath):
        if elem.tag.endswith('}RefData'):
            parse_security(elem, tracker)
            elem.clear()
        

CPU times: user 3min 37s, sys: 1.27 s, total: 3min 38s
Wall time: 3min 40s


In [11]:
#%%time

#tracker = init_tracker()

#threads = [Thread(target=parse_file, args=(fpath, tracker)) for fpath in debt_files]
#for t in threads:
#    t.start()
#for t in threads:
#    t.join()

In [13]:
%%time

manager = Manager()
pool = Pool(processes=len(debt_files))

tracker = manager.dict(init_tracker())

pool.starmap(parse_file, ((fpath, tracker) for fpath in debt_files))
pool.close()
pool.join()

CPU times: user 60.8 ms, sys: 20.6 ms, total: 81.4 ms
Wall time: 10min 3s


In [18]:
tracker

{'last_isin': 'XS1970667413',
 'floating': 39014,
 'fixed': 435760,
 'floating_uncat': {'index_name': {'FORMULA': 6002,
   'OTHR': 22,
   'OTROS': 853,
   'Subyacente Acciones': 257,
   'Stockholm Interbank Offer': 19,
   'SONA': 26,
   'NATIXIS TEC 10 Constant': 6,
   'Fonds': 10,
   'STBO': 67,
   'LIBI': 4,
   'N/A': 7,
   'LIBID - LIBI': 1,
   'Mexico Interbank TIIE': 1,
   'Formula': 113,
   'Hibor': 1,
   'NIBOR': 10,
   'INDEX RATE': 1,
   'PRBO': 4,
   'WIBOR': 2,
   'USD CMS Rate': 1,
   'Index Rate + 1.45%': 1,
   'Other': 1,
   'MULTIASSET DIV VOL2 ': 3,
   'CMS 10 ANS': 21,
   'CMS 10 ANNI': 3,
   'CMS 30 ANS': 5,
   'OOOOOOOOOOO0': 3,
   'ROBOR': 15,
   'EURON_DEP_1D': 13,
   'Canada Bankers Acceptance': 4,
   'Canadian Dollar Bankers': 2,
   'NIBOR 3 MOIS': 1,
   '5-Jahres-CHF-Swapsatz': 2,
   'Diff 10 to2 usd swap rate': 2,
   'Saudi Riyal Interbank': 1,
   'TREA': 2,
   'Floater': 4,
   'NZD BB 3 MESI': 1,
   'ASX Australian Bank': 11,
   'Oslo Bors Norway Interban': 18

In [None]:
libors

In [None]:

def search_isins(isins: Set[str], fpath: str) -> Tuple[Dict[str, Dict[str, str]], Set[str]]:
    results = {}
    missing = isins.copy()
    for event, elem in etree.iterparse(fpath):
        if elem.tag.endswith('}RefData'):
            isin = elem[0][0].text
            if isin in missing:
                currency = elem[0][4].text
                lei = elem[1].text
                nominal = (currency, float(elem[3][0].text))
                #maturity = datetime.strptime(elem[3][1].text, '%Y-%m-%d')
                #denom = float(elem[3][2].text)
                rca = elem[4][0].text
                results[isin] = {
                    'Currency': currency,
                    'Issuer LEI': lei,
                    'Competent Authority': rca,
                    'Nominal Amount': nominal
                }
                missing.remove(isin)
            elem.clear()
    return results, missing
                
def search_all_files(isins: Set[str], fpaths: List[str]) -> Tuple[Dict[str, Tuple[str]], Set[str]]:
    logging.info('Searching FIRDS XML files.')
    results = {}
    missing = isins.copy()
    for fpath in fpaths:
        _results, _missing = search_isins(missing, fpath)
        results.update(_results)
        missing = _missing
    return results, missing