<a href="https://colab.research.google.com/github/Roshini14/nasdaq_itch50/blob/main/vwap.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install bitstring



In [2]:
import gzip
import shutil
from struct import unpack
from collections import namedtuple, defaultdict
from pathlib import Path
from urllib.request import urlretrieve
from urllib.parse import urljoin
import pandas as pd
import numpy as np

In [3]:
url = 'https://emi.nasdaq.com/ITCH/Nasdaq%20ITCH/01302019.NASDAQ_ITCH50.gz'
data_path = Path('data')
if not data_path.exists():
    print('Creating directory')
    data_path.mkdir()
else:
    print('Directory exists')

filename = data_path / url.split('/')[-1]
if not filename.exists():
    print('Downloading...', url)
    urlretrieve(url, filename)
else:
    print('File exists')

unzipped = data_path / (filename.stem + '.bin')
if not unzipped.exists():
    print('Unzipping to', unzipped)
    with gzip.open(str(filename), 'rb') as f_in:
        with open(unzipped, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
            print("printing")
else:
    print('File already unpacked')

Directory exists
File exists
File already unpacked


In [4]:
message_types = pd.read_csv('message_types.csv')
message_types.loc[:, 'formats'] = (message_types[['value', 'length']]
                            .apply(tuple, axis=1).map(formats))
message_fields, fstring = {}, {}
for t, message in message_types.groupby('message_type'):
    message_fields[t] = namedtuple(typename=t, field_names=message.name.tolist())
    fstring[t] = '>' + ''.join(message.formats.tolist())
formats = {
    ('integer', 2): 'H',  # int of length 2 => format string 'H'
    ('integer', 4): 'I',
    ('integer', 6): '6s',  # int of length 6 => parse as string, convert later
    ('integer', 8): 'Q',
    ('alpha',   1): 's',
    ('alpha',   2): '2s',
    ('alpha',   4): '4s',
    ('alpha',   8): '8s',
    ('price_4', 4): 'I',
    ('price_8', 8): 'Q',
}

In [5]:
df_trades = pd.DataFrame()
df_oe = pd.DataFrame()
df_bt = pd.DataFrame()
df_stockdir = pd.DataFrame()
df_final = pd.DataFrame()
df_vwap = pd.DataFrame()
id_to_price = {}

In [6]:
class ITCHParser:
    def __init__(self, file_name):
        # Initialize ITCHParser with file_name and default values
        self.file_path = file_name
        self.messages = defaultdict(list)
        self.message_count = 0
        self.latest_ts = 34200
        self.is_market_open = False
        self.latest_hour = 34200
        self.columns = {
            "stock": ["stock_locate", "stock"],
            "add": ["stock_locate", "price"],
            "trades": ["stock_locate", "shares", "price", "cross_type", "match_number"],
            "oe": ['stock_locate', 'shares', 'match_number'],
            "bt": ['match_number']
        }

    def parse(self):
        try:
            with open(self.file_path, 'rb') as file:
                while True:
                    # Read message size and type from file
                    message_size = int.from_bytes(file.read(2), byteorder='big', signed=False)
                    msg_type = file.read(1)
                    try:
                        record = file.read(message_size - 1)
                        # Process non-excluded message types
                        if msg_type not in {b'H', b'Y', b'L', b'V', b'W', b'K', b'J', b'h', b'X', b'D', b'I', b'O'}:
                            message = message_fields[msg_type.decode('latin-1')]._make(
                                unpack(fstring[msg_type.decode('latin-1')], record))
                            # Check if market is open or message type is R
                            if msg_type != b'S' and (self.is_market_open or msg_type == b'R'):
                                self.messages[msg_type.decode('latin-1')].append(message)
                    except Exception as e:
                        # Handle exception while decoding message
                        print(e)
                        print(msg_type)

                    # Process messages based on conditions
                    if msg_type not in {b'H', b'Y', b'L', b'V', b'W', b'K', b'J', b'h', b'X', b'D', b'I', b'O'}:
                        seconds = int.from_bytes(message.timestamp, byteorder='big') * 1e-9
                        # Check and update timestamp for market open and hourly calculations
                        if self.is_market_open and seconds > self.latest_ts + 120:
                            self.store_messages()
                            vwap_chunk()
                            self.messages.clear()
                            self.latest_ts = round(seconds / 120) * 120
                        if self.is_market_open and seconds > self.latest_hour + 3600:
                            compute_vwap(self.latest_hour)
                            self.latest_hour += 3600
                        if msg_type == b'S':
                            if message.event_code.decode('ascii') == 'C':
                                self.store_messages()
                                break
                            if message.event_code.decode('ascii') == 'Q':
                                self.is_market_open = True
                            if message.event_code.decode('ascii') == 'M':
                                vwap_chunk()
                                compute_vwap(self.latest_hour)
                                self.is_market_open = False

        except Exception as e:
            print(f"Error: {e} {msg_type}")

    def store_messages(self):
        """Handle occasional storing of all messages"""
        global df_stockdir, df_trades, df_oe, df_bt
        for mtype, data in self.messages.items():
            try:
                data = pd.DataFrame(data)
                if mtype == 'R':
                    df_stockdir = pd.concat([df_stockdir, data[self.columns['stock']]])
                elif mtype in {'A', 'F', 'U'}:
                    data['price'] = data['price'] / 10 ** 4
                    id_to_price.update(dict(zip(data['stock_locate'], data['price'])))
                elif mtype == 'E':
                    df_oe = pd.concat([df_oe, data[self.columns['oe']]])
                elif mtype in {'C', 'P', 'Q'}:
                    if "cross_type" not in data.columns:
                        data["cross_type"] = b'T'
                    df_trades = pd.concat([df_trades, data[self.columns['trades']]])
                elif mtype == 'B':
                    df_bt = pd.concat([df_bt, data[self.columns['bt']]])
                else:
                    continue
            except Exception as e:
                print(f"Error processing message type {mtype}: {e}")
                return 1
        return 0

In [7]:
def vwap_chunk():
    # Declare global variables to update them within the function
    global df_trades, df_bt, df_oe, df_final, id_to_price

    # Check if df_trades is not empty
    if not df_trades.empty:
        # Adjust price in df_trades
        df_trades['price'] = df_trades['price'] / 10**4

        # Map prices to df_oe using id_to_price dictionary
        df_oe['price'] = df_oe['stock_locate'].map(id_to_price)

        # Combine relevant columns from df_trades and df_oe
        df_alltrades = pd.concat([df_trades[df_trades['cross_type'] != b'N'],
                                  df_oe[['stock_locate', 'shares', 'price', 'match_number']]],
                                 ignore_index=True, sort=False)

        # Initialize df_fintrades with df_alltrades
        df_fintrades = df_alltrades

        # Check if df_bt is not empty
        if not df_bt.empty:
            # Mark broken trades in df_bt
            df_bt['broken'] = 1

            # Merge df_alltrades with df_bt using a left join
            df_fintrades = df_alltrades.merge(df_bt, how='left')

            # Fill NaN values in 'broken' column with 0
            df_fintrades['broken'] = df_fintrades['broken'].fillna(0)

            # Filter out broken trades
            df_fintrades = df_fintrades[df_fintrades['broken'] == 0].reset_index(drop=True)

        # Convert columns to numeric, if possible
        df_fintrades.apply(lambda col: pd.to_numeric(col, errors='coerce'))

        # Calculate volume and vwp
        df_fintrades['volume'] = df_fintrades['shares'] * df_fintrades['price']
        df_fintrades["vwp"] = df_fintrades['volume'] * df_fintrades["price"]

        # Concatenate df_final with df_fintrades
        df_final = pd.concat([df_final, df_fintrades[['stock_locate', 'volume', 'vwp']]])

        # Reset index of df_final
        df_final = df_final.reset_index(drop=True)

        # Reset df_trades, df_oe, and df_bt to empty DataFrames
        df_trades = pd.DataFrame()
        df_oe = pd.DataFrame()
        df_bt = pd.DataFrame()

In [8]:
def compute_vwap(hour):
    global df_vwap
    df_stockwise = df_final[['stock_locate','volume','vwp']].groupby('stock_locate').sum()
    df_stockwise['vwap_hourly'] = df_stockwise['vwp']/df_stockwise['volume']
    df_vwap_hourly = df_stockdir.merge(df_stockwise, how='left', on=['stock_locate'])[['stock_locate', 'stock', 'volume', 'vwap_hourly']]
    df_vwap_hourly['stock'] = df_vwap_hourly['stock'].apply(lambda x: x.decode('latin-1').strip(' '))
    if df_vwap.empty:
        df_vwap = pd.concat([df_vwap, df_vwap_hourly[['stock','vwap_hourly']]], axis = 1)
    else:
        df_vwap = pd.merge(df_vwap, df_vwap_hourly[['stock','vwap_hourly']], on='stock', suffixes=('_'+str(hour//3600-1), '_'+str(hour//3600)))
    print(df_vwap)

In [9]:
parser = ITCHParser(unzipped)
parser.parse()

       stock  vwap_hourly
0          A    73.875951
1         AA    28.753590
2       AAAU          NaN
3       AABA    66.583718
4        AAC     2.176698
...      ...          ...
8709  ZXYZ.A          NaN
8710   ZXZZT          NaN
8711    ZYME    15.147712
8712    ZYNE     5.117610
8713   CFG-D    25.459448

[8714 rows x 2 columns]
       stock  vwap_hourly_9  vwap_hourly_10
0          A      73.875951       74.116453
1         AA      28.753590       28.931470
2       AAAU            NaN             NaN
3       AABA      66.583718       66.682560
4        AAC       2.176698        2.282354
...      ...            ...             ...
8711   ZXIET            NaN             NaN
8712  ZXYZ.A            NaN             NaN
8713   ZXZZT            NaN             NaN
8714    ZYME      15.147712       15.628626
8715    ZYNE       5.117610        5.119240

[8716 rows x 3 columns]
       stock  vwap_hourly_9  vwap_hourly_10  vwap_hourly
0          A      73.875951       74.116453    74.419

In [10]:
df_vwap

Unnamed: 0,stock,vwap_hourly_9,vwap_hourly_10,vwap_hourly_11,vwap_hourly_12,vwap_hourly_13,vwap_hourly_14,vwap_hourly
0,A,73.875951,74.116453,74.419959,74.340817,74.372846,74.850058,75.428961
1,AA,28.753590,28.931470,28.964342,28.998382,29.027929,29.109583,29.224947
2,AAAU,,,,,13.100000,13.100000,13.100000
3,AABA,66.583718,66.682560,66.762930,66.863329,66.948084,67.140023,67.597766
4,AAC,2.176698,2.282354,2.271846,2.272493,2.268790,2.269585,2.273249
...,...,...,...,...,...,...,...,...
8835,ZXIET,,,,,,,
8836,ZXYZ.A,,,,,,,
8837,ZXZZT,,,,,,,
8838,ZYME,15.147712,15.628626,15.519029,15.500919,15.629500,15.631514,15.622935


In [11]:
df_vwap.to_csv("final.csv")