# Download and Parse a Sample File of ITCH Messages
--- 
[GITHUB](https://github.com/PacktPublishing/Machine-Learning-for-Algorithmic-Trading-Second-Edition/blob/master/02_market_and_fundamental_data/01_NASDAQ_TotalView-ITCH_Order_Book/01_parse_itch_order_flow_messages.ipynb)

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
%matplotlib inline
import gzip
import shutil 

import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns 

from matplotlib.ticker import FuncFormatter 
from struct import unpack
from collections import namedtuple, Counter, defaultdict
from pathlib import Path
from urllib.request import urlretrieve
from urllib.parse import urljoin
from datetime import timedelta
from time import time


In [None]:
sns.set_style('whitegrid')

In [None]:
def format_time(t):
    """Return a formatted time string 'HH:MM:SS
    based on a numeric time() value"""
    m, s = divmod(t, 60)
    h, m = divmod(m, 60)
    return f'{h:0>2.0f}:{m:0>2.0f}:{s:0>5.2f}'

# ITCH Format Settings
---
### From [message_types.xlsx](message_types.xlsx)
### [`struct`](https://docs.python.org/3/library/struct.html) module for binary data 
- ITCH tick data comes in binary format 
- `struct` parses binary data using format strings 
    - identifies the message element by indicating length and type of various components of the byte string 
    - conversions between Python values and C structs represented as Python byte objects 

### Defining Format Strings

In [None]:
event_codes = {'O' : 'Start of Message', 
            'S' : 'Start of System Hours', 
            'Q' : 'Start of Market Hours',
            'M' : 'End of Market Hours', 
            'E' : 'End of System Hours', 
            'C' : 'End of Messages'}

encoding = {'primary_market_maker': {'Y': 1, 'N': 0},
        'printable'           : {'Y': 1, 'N': 0},
        'buy_sell_indicator'  : {'B': 1, 'S': -1},
        'cross_type'          : {'O': 0, 'C': 1, 'H': 2},
        'imbalance_direction' : {'B': 0, 'S': 1, 'N': 0, 'O': -1}}

# Assembles Format Strings According to the Formats Dictionary 
formats = {
    ('integer', 2): 'H',  #int of length 2 -> format string 'H'
    ('integer', 4): 'I',
    ('integer', 6): '6s',  #int of length 6 -> parse as string, convert later
    ('integer', 8): 'Q',
    ('alpha',   1): 's',
    ('alpha',   2): '2s',
    ('alpha',   4): '4s',
    ('alpha',   8): '8s',
    ('price_4', 4): 'I',
    ('price_8', 8): 'Q',
}

## Create Message Specs for Binary Data Parser 
### Load Message Types 
- `message_types.xlxs` contains messasge type specs (per [ITCH Protocol Documentation](https://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/NQTVITCHSpecification.pdf))

In [None]:
message_data = (pd.read_excel('message_types.xlsx', sheet_name='messages')
                .sort_values('id')
                .drop('id', axis=1))


In [None]:
message_data.head()

### Clean Message Types
- function `clean_message_types()` runs basic cleaning steps 

In [None]:
def clean_message_types(df): 
    df.columns = [c.lower().strip() for c in df.columns]
    df.value = df.value.str.strip()
    df.name = (df.name
            .str.strip()
            .str.lower()
            .str.replace(' ','_')
            .str.replace('-','_')
            .str.replace('/','_'))
    df.notes = df.notes.str.strip()
    df['message_type'] = df.loc[df.name == 'message_type', 'value'] 
    return df 

In [None]:
message_types = clean_message_types(message_data)

### Get Message Labels
- extract message type codes and names to make results more readable 

In [None]:
message_labels = (message_types.loc[:, ['message_type', 'notes']]
                    .dropna()
                    .rename(columns={'notes':'name'}))

message_labels.name = (message_labels.name
                    .str.lower()
                    .str.replace('message','')
                    .str.replace('.','')
                    .str.strip().str.replace(' ','_'))

message_labels.head()

### Finalize Specification Details 
- [Struct](https://docs.python.org/3/library/struct.html) Module: use format information to parse binary source data 
- Messages consist of fields defined by offset, length, and type of value

---
### *Option to Reload Cleaned `message_types.xlsx` from `.csv` File*

In [None]:
message_types.to_csv('message_types.csv', index=False)

In [None]:
message_types = pd.read_csv('message_types.csv')

---

### Translate Message Specs info Format Strings and `namedtuples` that Capture Message Content 
- create `(type, length)` formatting tupes from ITCH specs 

In [None]:
message_types.loc[:, 'formats'] = (message_types[['value', 'length']].apply(tuple, axis=1).map(formats))

- extract formatting details for alphanumerical fields 

In [None]:
alpha_fields = message_types[message_types.value == 'alpha'].set_index('name')
alpha_msgs = alpha_fields.groupby('message_type')
alpha_formats = {k: v.to_dict() for k, v in alpha_msgs.formats}
alpha_length = {k: v.add(5).to_dict() for k, v in alpha_msgs.length}

- generate message classes as named tuples and format strings 

In [None]:
message_fields, fstring = {}, {}
for t, message in message_types.groupby('message_type'): 
    message_fields[t] = namedtuple(typename=t, field_names=message.name.tolist())
    fstring[t] = '>' + ''.join(message.formats.tolist())

In [None]:
alpha_fields.info()

In [None]:
alpha_fields.head()

### Post-Processing
- required by fields of `alpha` type (alphanumeric)

In [None]:
def format_alpha(mtype, data): 
    ''' 
    Process byte strings of type alpha
    '''

    for col in alpha_formats.get(mtype).keys(): 
        if mtype != 'R' and col == 'stock': 
            data = data.drop(col, axis=1)
            continue 
        data.loc[: col] = data.loc[:, col].str.decode("utf-8").str.strip()
        if encoding.get(col): 
            data.loc[:, col] = data.loc[:, col].map(encoding.get(col))
    
    return data 

# Get NASDAQ ITCH Data from FTP Server 
--- 
### From [Binary ITCH Data](data/)
- Nasdaq offers samples of daily binary files for several months 
- parse a sample file of ITCH messages 
- reconstruct executed trades and the order book for any given tick 
- Large Dataset (time and memory space 16GB+)

### Set Data Paths
- store data in `data` subdirectory and convert result to `hdf` format 
- Sample Files: [NASDAQ ftp server](ftp://emi.nasdaq.com/ITCH/)
- `FTP_URL` changed from 'ftp://...' to 'https://emi.nasdaq.com/ITCH/Nasdaq%20ITCH/' per [forum](https://exchange.ml4trading.io/t/chapter-2-nasdaq-itch-ftp-error/626/9)

In [None]:
data_path = Path('data') # SET TO EXTERNAL HARDDRIVE -> LARGE DATASET
itch_store = str(data_path / 'itch.h5')
order_book_store = data_path / 'order_book.h5'

In [None]:
# Sample FTP Address, filename and corresponding date used in example
FTP_URL = 'https://emi.nasdaq.com/ITCH/Nasdaq%20ITCH/'
SOURCE_FILE = '10302019.NASDAQ_ITCH50.gz'

### Download and Unzip 

In [None]:
def may_be_downloaded(url): 
    ''' 
    Download and Unzip ITCH Data if Not Yet Avaliable
    '''
    if not data_path.exists(): 
        print('Creating Directory')
        data_path.mkdir()
    else: 
        print('Directory Exists')
    
    filename = data_path / url.split('/')[-1]
    if not filename.exists(): 
        print('Downloading...', url)
        urlretrieve(url,filename)
    else: 
        print('File Exists')
    
    unzipped = data_path / (filename.stem + '.bin')
    if not unzipped.exists(): 
        print('Unzipping to', unzipped)
        with gzip.open(str(filename), 'rb') as f_in: 
            with open(unzipped, 'wb') as f_out: 
                shutil.copyfileobj(f_in, f_out)
    else: 
        print('File Already Unpacked')
    
    return unzipped

In [None]:
file_name = may_be_downloaded(urljoin(FTP_URL, SOURCE_FILE))
date = file_name.name.split('.')[0]

# Process Binary Message Data 
---
- binary file for single day contains over **350,000,000** *worth over 12GB*
- appends parsed result iteratively to a file in the fast HDF5 format 

In [None]:
from sys import byteorder


def store_messages(m): 
    ''' 
    Handle Occasional Storing of All Messages
    '''

    with pd.HDFStore(itch_store) as store: 
        for mtype, data in m.items(): 
            data = pd.DataFrame(data)

            # Parse Timestamp Info
            data.timestamp = data.timestamp.apply(int.from_bytes, byteorder='big')
            data.timestamp = pd.to_timedelta(data.timestamp)

            # Alpha Formatting 
            if mtype in alpha_formats.key():
                data = format_alpha(mtype, data)
            
            s = alpha_length.get(mtype)
            if s: 
                s = {c: s.get(c) for c in data.columns}
            dc = ['stock_locate']
            if m =='R': 
                dc.append('stock')
            try: 
                store.append(mtype, data, format='t', min_itemsize=s, data_columns=dc)
            
            except Exception as e: 
                print(e)
                print(mtype)
                print(data.info())
                print(pd.Series(list(m.keys())).value_counts())
                data.to_csv('data.csv', index=False)
                return 1
    return 0

In [None]:
messages = defaultdict(list)
message_count = 0 
message_type_counter = Counter()

- processes binary file and produces parsed orders stored by message type 

In [None]:
start = time()
with file_name.open('rb') as data: 
    while True: 

        # determine message size (in bytes)
        message_size = int.from_bytes(data.read(2), byteorder='big', signed=False)

        # message type from first byte 
        message_type = data.read(1).decode('ascii')
        message_type_counter.update([message_type])

        # read and store message 
        try: 
            record = data.read(message_size - 1)
            message = message_fields[message_type]._make(unpack(fstring[message_type], record))
            messages[message_type].append(message)
        except Exception as e: 
            print(e)
            print(message_type)
            print(record)
            print(fstring[message_type])

        # System Events 
        if message_type == 'S': 
            seconds = int.from_bytes(message.timestamp, byteorder='big') * 1e-9 
            print('\n', event_codes.get(message.event_code.decode('ascii'), 'Error'))
            print(f'\t{format_time(seconds)}\t{message_count:12,.0f}')
            if message.event_code.decode('ascii') == 'C': 
                store_messages(message)
                break 
        message_count += 1 


        if message_count % 2.5e7 == 0: 
            seconds = int.from_bytes(message.timestamp, byteorder='big') * 1e-9
            d = format_time(time() - start) 
            print(f'\t{format_time(seconds)}\t{message_count:12,.0f}\t{d}')
            res = store_messages(messages)
            if res == 1: 
                print(pd.Series(dict(message_type_counter)).sort_values())
                break 
            messages.clear() 

print('Duration: ', format_time(time() - start))

# Summarize Trading Day via [ITCH data](data/) and [message_types.csv](message_types.csv)
---

In [None]:
counter = pd.Series(message_type_counter).to_frame('# Trades')
counter['Message Type'] = counter.index.map(message_labels.set_index('message_type').name.to_dict())
counter = counter[['Message Type', '# Trades']].sort_values('# Trades', ascending=False)
counter 

In [None]:
with pd.HDFStore(itch_store) as store: 
    store.put('summary', counter)

### Top Equities by Traded Value

In [None]:
with pd.HDFStore(itch_store) as store: 
    stocks = store['R'].loc[:, ['stock_locate', 'stock']]
    trades = store['P'].append(store['Q'].rename(columns={'cross_price' : 'price'}), sort=False).merge(stocks)

trades['value'] = trades.shares.mul(trades.price)
trades['value_share'] = trades.value.div(trades.value.sum())

trade_summary = trades.groupby('stock').value_share.sum().sort_values(ascending=False)
trade_summary.iloc[:50].plot.bar(figsize=(14,6), color='darkblue', title='Share of Traded Value')

plt.gct().yaxis.set_major_formatter(FuncFormatter(lamda y, _ : '{:.0%}'.format(y)))
sns.despine()
plt.tight_layout()