# 0. Imports

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
%matplotlib inline

# Misc
from struct import unpack
from collections import namedtuple, Counter, defaultdict

# Data and file management
import pandas as pd
import gzip
import json

# URL management
from urllib.request import urlretrieve
from urllib.parse import urljoin

# Time format
from datetime import timedelta
from time import time

# System
import os
import traceback
import sys
import shutil
from pathlib import Path

# Visualise
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import seaborn as sns

# Literal eval
from ast import literal_eval

# 1. Setup boilerplate

In [4]:
sns.set_style("whitegrid")

In [5]:
with open('../config.json', 'r') as config_file:
    global configs
    global itch_config
    configs = json.load(config_file)
    itch_config = configs['itch-format']
    configs = configs['nasdaq-total-view-itch']

# 2. Format time function

In [6]:
def format_time(t):
    """Return a formatted time string 'HH:MM:SS
    based on a numeric time() value"""
    m, s = divmod(t, 60)
    h, m = divmod(m, 60)
    return f'{h:0>2.0f}:{m:0>2.0f}:{s:0>5.2f}'

# 3. Download Data Fucntion

In [7]:
def download_data(data_path: str, url: str) -> dict:
    """
    Download and unzip ITCH data if not yet available
    """

    # Basic initialisation
    itch_store = str(data_path / 'itch.h5')
    order_book_store = data_path / 'order_book.h5'

    # If data path does not exist create one
    if not data_path.exists():
        try:
            print("Creating dir...")
            data_path.mkdir()
        except Exception as data_path_e:
           error_type, error_obj, error_trace = sys.exc_info()
           return {"status": False, 
                   "error_type": error_type, 
                   "error_message": str(data_path_e), 
                   "error_line": error_trace.tb_lineno, 
                   "error_module": "download_data()"} 
        
    else:
        print("Directory exists")
        
    
    # Download file if it doesn't exist
    file_name = data_path / url.split('/')[-1]
    if not file_name.exists():
        try:
            print("Downloading ..." , url)
            urlretrieve(url, file_name)
        except Exception as file_download_e:
            return {"status": False, 
                   "error_type": error_type, 
                   "error_message": str(file_download_e), 
                   "error_line": error_trace.tb_lineno, 
                   "error_module": "download_data()"} 
        
    else:
        print("File Exists")

    # Unpacking file
    unzipped = data_path / (file_name.stem + ".bin")
    if not unzipped.exists():
        try:
            print("Unzipping to ", unzipped)
            with gzip.open(str(file_name), 'rb') as f_in:
                with open(unzipped, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

        except Exception as file_cp_e:
            return {"status": False, 
                   "error_type": error_type, 
                   "error_message": str(file_cp_e), 
                   "error_line": error_trace.tb_lineno, 
                   "error_module": "download_data()"} 
        

    else:
        print("File already unpacked")

    return {"status" : True,
            "response" : unzipped}
        

In [8]:
data_path = configs['data-path']
http_url = configs['https-url']
src_file = configs['src-file']

In [9]:
data_path = Path(data_path)
itch_store = str(data_path / 'itch.h5')
order_book_store = data_path / 'order_book.h5'

In [10]:
result = download_data(data_path=data_path, url = urljoin(http_url, src_file))
if result['status']:
    global file_name
    file_name = result['response']
else:
    print(result)

Directory exists
File Exists
File already unpacked


# 4. Setting up ITCH Format

In [11]:
event_codes = itch_config['event-codes']
encoding = itch_config['encodings']
formats = {literal_eval(key) : value for key, value in itch_config['formats'].items()}

# 5. Load message types

In [12]:
message_data = pd.read_excel(io='message_types.xlsx',sheet_name='messages').sort_values('id').drop('id', axis=1)

In [13]:
message_data.head()

Unnamed: 0,Name,Offset,Length,Value,Notes
0,Message Type,0,1,S,System Event Message
1,Stock Locate,1,2,Integer,Always 0
2,Tracking Number,3,2,Integer,Nasdaq internal tracking number
3,Timestamp,5,6,Integer,Nanoseconds since midnight
4,Event Code,11,1,Alpha,See System Event Codes below


# 6. Message Cleaning

In [14]:
def clean_message_types(df: pd.DataFrame)-> dict:
    df.columns = [column.lower().strip() for column in df.columns]
    df.value = df.value.str.strip()
    df.name = (df.name
               .str.strip()
               .str.lower()
               .str.replace(' ', "_")
               .str.replace('-', "_")
               .str.replace('/', "_")
               )
    df.notes = df.notes.str.strip()
    df['message_type'] = df.loc[df.name == "message_type", "value"]
    return df

In [15]:
message_types = clean_message_types(message_data)

# 7. Message Labels

In [16]:
message_labels = (message_types.loc[:, ['message_type', 'notes']]
                  .dropna()
                  .rename(columns={'notes': 'name'}))
                  
message_labels.name = (message_labels.name
                       .str.lower()
                       .str.replace('message', '')
                       .str.replace('.', '')
                       .str.strip().str.replace(' ', '_'))
# message_labels.to_csv('message_labels.csv', index=False)
message_labels.head()

Unnamed: 0,message_type,name
0,S,system_event
5,R,stock_directory
23,H,stock_trading_action
31,Y,reg_sho_short_sale_price_test_restricted_indic...
37,L,market_participant_position


In [17]:
message_types.message_type = message_types.message_type.ffill()
message_types = message_types[message_types.name != 'message_type']
message_types.value = (message_types.value
                       .str.lower()
                       .str.replace(' ', '_')
                       .str.replace('(', '')
                       .str.replace(')', ''))
message_types.info()

<class 'pandas.core.frame.DataFrame'>
Index: 152 entries, 1 to 172
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   name          152 non-null    object
 1   offset        152 non-null    int64 
 2   length        152 non-null    int64 
 3   value         152 non-null    object
 4   notes         152 non-null    object
 5   message_type  152 non-null    object
dtypes: int64(2), object(4)
memory usage: 8.3+ KB


In [18]:
message_types.head()

Unnamed: 0,name,offset,length,value,notes,message_type
1,stock_locate,1,2,integer,Always 0,S
2,tracking_number,3,2,integer,Nasdaq internal tracking number,S
3,timestamp,5,6,integer,Nanoseconds since midnight,S
4,event_code,11,1,alpha,See System Event Codes below,S
6,stock_locate,1,2,integer,Locate Code uniquely assigned to the security ...,R


In [19]:
message_types.to_csv('message_types.csv', index=False)

In [20]:
message_types = pd.read_csv('message_types.csv')


# 8. Build a Parser


The parser translates the message specs into format strings and namedtuples that capture the message content. First, we create (type, length) formatting tuples from ITCH specs:

In [21]:
message_types.loc[:, 'formats'] = (message_types[['value', 'length']]
                            .apply(tuple, axis=1).map(formats))

In [22]:
alpha_fields = message_types[message_types.value == 'alpha'].set_index('name')
alpha_msgs = alpha_fields.groupby('message_type')
alpha_formats = {k: v.to_dict() for k, v in alpha_msgs.formats}
alpha_length = {k: v.add(5).to_dict() for k, v in alpha_msgs.length}

In [23]:
message_fields, fstring = {}, {}
for t, message in message_types.groupby('message_type'):
    message_fields[t] = namedtuple(typename=t, field_names=message.name.tolist())
    fstring[t] = '>' + ''.join(message.formats.tolist())

In [24]:
alpha_fields.info()

<class 'pandas.core.frame.DataFrame'>
Index: 45 entries, event_code to price_variation_indicator
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   offset        45 non-null     int64 
 1   length        45 non-null     int64 
 2   value         45 non-null     object
 3   notes         45 non-null     object
 4   message_type  45 non-null     object
 5   formats       45 non-null     object
dtypes: int64(2), object(4)
memory usage: 2.5+ KB


In [25]:
alpha_fields.head()

Unnamed: 0_level_0,offset,length,value,notes,message_type,formats
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
event_code,11,1,alpha,See System Event Codes below,S,s
stock,11,8,alpha,Denotes the security symbol for the issue in t...,R,8s
market_category,19,1,alpha,Indicates Listing market or listing market tie...,R,s
financial_status_indicator,20,1,alpha,"For Nasdaq listed issues, this field indicates...",R,s
round_lots_only,25,1,alpha,Indicates if Nasdaq system limits order entry ...,R,s


# 9. Function to process alpha field

In [26]:
def format_alpha(mtype, data):
    """Process byte strings of type alpha"""

    for col in alpha_formats.get(mtype).keys():
        if mtype != 'R' and col == 'stock':
            data = data.drop(col, axis=1)
            continue
        data.loc[:, col] = data.loc[:, col].str.decode("utf-8").str.strip()
        if encoding.get(col):
            data.loc[:, col] = data.loc[:, col].map(encoding.get(col))
    return data

In [27]:
def store_messages(m):
    """Handle occasional storing of all messages"""
    with pd.HDFStore(itch_store) as store:
        for mtype, data in m.items():
            # convert to DataFrame
            data = pd.DataFrame(data)

            # parse timestamp info
            data.timestamp = data.timestamp.apply(int.from_bytes, byteorder='big')
            data.timestamp = pd.to_timedelta(data.timestamp)

            # apply alpha formatting
            if mtype in alpha_formats.keys():
                data = format_alpha(mtype, data)

            s = alpha_length.get(mtype)
            if s:
                s = {c: s.get(c) for c in data.columns}
            dc = ['stock_locate']
            if m == 'R':
                dc.append('stock')
            try:
                store.append(mtype,
                         data,
                         format='t',
                         min_itemsize=s,
                         data_columns=dc)
                
            except Exception as e:
                print(e)
                print(mtype)
                print(data.info())
                print(pd.Series(list(m.keys())).value_counts())
                data.to_csv('data.csv', index=False)
                return 1
    return 0

In [28]:
messages = defaultdict(list)
message_count = 0
message_type_counter = Counter()

In [29]:
start = time()
with file_name.open('rb') as data:
    while True:

        # determine message size in bytes
        message_size = int.from_bytes(data.read(2), byteorder='big', signed=False)
        
        # get message type by reading first byte
        message_type = data.read(1).decode('ascii')        
        message_type_counter.update([message_type])

        # read & store message
        try:
            record = data.read(message_size - 1)
            message = message_fields[message_type]._make(unpack(fstring[message_type], record))
            messages[message_type].append(message)
        except Exception as e:
            print(e)
            print(message_type)
            print(record)
            print(fstring[message_type])
        
        # deal with system events
        if message_type == 'S':
            seconds = int.from_bytes(message.timestamp, byteorder='big') * 1e-9
            print('\n', event_codes.get(message.event_code.decode('ascii'), 'Error'))
            print(f'\t{format_time(seconds)}\t{message_count:12,.0f}')
            if message.event_code.decode('ascii') == 'C':
                store_messages(messages)
                break
        message_count += 1

        if message_count % 2.5e7 == 0:
            seconds = int.from_bytes(message.timestamp, byteorder='big') * 1e-9
            d = format_time(time() - start)
            print(f'\t{format_time(seconds)}\t{message_count:12,.0f}\t{d}')
            print
            res = store_messages(messages)
            if res == 1:
                print(pd.Series(dict(message_type_counter)).sort_values())
                break
            messages.clear()

print('Duration:', format_time(time() - start))


 Start of Messages
	03:02:31.65	           0

 Start of System Hours
	04:00:00.00	     241,258

 Start of Market Hours
	09:30:00.00	   9,559,279
	09:44:09.23	  25,000,000	00:00:51.89
Cannot serialize the column [primary_market_maker]
because its data contents are not [string] but [integer] object dtype
L
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 214749 entries, 0 to 214748
Data columns (total 7 columns):
 #   Column                    Non-Null Count   Dtype          
---  ------                    --------------   -----          
 0   stock_locate              214749 non-null  int64          
 1   tracking_number           214749 non-null  int64          
 2   timestamp                 214749 non-null  timedelta64[ns]
 3   mpid                      214749 non-null  object         
 4   primary_market_maker      214749 non-null  object         
 5   market_maker_mode         214749 non-null  object         
 6   market_participant_state  214749 non-null  object         
dtypes:

# 10. Summarize Trading Data

In [30]:
counter = pd.Series(message_type_counter).to_frame('# Trades')
counter['Message Type'] = counter.index.map(message_labels.set_index('message_type').name.to_dict())
counter = counter[['Message Type', '# Trades']].sort_values('# Trades', ascending=False)
counter

Unnamed: 0,Message Type,# Trades
A,add_order_no_mpid_attribution,10094291
D,order_delete,9044692
U,order_replace,2132765
X,order_cancel,1086393
I,noii,1072326
F,add_order_mpid_attribution,836655
E,order_executed,364951
L,market_participant_position,214749
P,trade,108412
C,order_executed_with_price,9176


# 11. Understandng trades

In [32]:
with pd.HDFStore(itch_store) as store:
    store.put('summary', counter)

In [73]:
with pd.HDFStore(itch_store) as store:
    stocks = store['R'].loc[:, ['stock_locate', 'stock']]

['/H', '/L', '/R', '/S', '/Y', '/summary']
    stock_locate  tracking_number                 timestamp stock  \
0              1                0 0 days 03:07:05.590649696     A   
1              2                0 0 days 03:07:05.590744771    AA   
2              3                0 0 days 03:07:05.590790029  AAAU   
3              4                0 0 days 03:07:05.590859921  AACG   
4              5                0 0 days 03:07:05.590902869  AADR   
5              6                0 0 days 03:07:05.590945996   AAL   
6              7                0 0 days 03:07:05.590987138  AAMC   
7              8                0 0 days 03:07:05.591027925  AAME   
8              9                0 0 days 03:07:05.591068684   AAN   
9             10                0 0 days 03:07:05.591111873  AAOI   
10            11                0 0 days 03:07:05.591152425  AAON   
11            12                0 0 days 03:07:05.591192615   AAP   
12            13                0 0 days 03:07:05.591233982 