In [1]:
# default_exp core

# ParseDeep

> Parse IEX DEEP files

In [2]:
#hide
from nbdev.showdoc import *

In [3]:
#export
from pcap import pcap
from struct import unpack, unpack_from, calcsize
from functools import cache

In [4]:
#hide
import pyarrow as pa 
import pyarrow.parquet as pq
import pandas as pd

In [5]:
#export
class Msg():
    "Class for working with a DEEP message"
    
    msg_type = None
    msg_sub_type = None
    
    def __init__(self, sequence_number, msg, debug=False):
        self.sequence_number = sequence_number
        self.type = msg[0:1]
        self.debug = debug
        if self.debug:
            self.bytes = msg
            
    @classmethod
    def matches_msg_type(cls, msg):
        return msg[0:1] == cls.msg_type
    
    @classmethod
    def matches_msg_sub_type(cls, msg):
        return True # override message type has subtypes
    
    @classmethod
    @cache
    def type_clss(cls):
        all = []
        for type_cls in cls.__subclasses__():
            all.append(type_cls)
            for sub_type_cls in type_cls.__subclasses__():
                all.append(sub_type_cls)
        return all
            
    @classmethod
    def factory(cls, sequence_number, msg, debug=False):
        for type_cls in Msg.type_clss():
            if type_cls.matches_msg_type(msg) and type_cls.matches_msg_sub_type(msg):
                return type_cls(sequence_number, msg, debug=debug)
        return UnsupportedMsg(sequence_number, msg, debug=debug)

In [6]:
#export
class PriceLevelUpdateMsg(Msg):
    "Class for working with a Price Level Update message"

    def __init__(self, sequence_number, msg, debug=False):
        fmt = "<bq8siq"    
        
        self.event_flags, self.timestamp, self.symbol, self.size, self.price = \
            unpack_from(fmt, msg[1:])

        self.symbol = self.symbol.rstrip().decode('UTF-8')
        self.price = self.price / 10000
                
        super().__init__(sequence_number, msg, debug=debug)        

In [7]:
#export
class BuySidePriceLevelUpdageMsg(PriceLevelUpdateMsg):
    "Class for working with a Buy Side Price Level Update message"
    msg_type = b'8'

In [8]:
#export
class SellSidePriceLevelUpdageMsg(PriceLevelUpdateMsg):
    "Class for working with a Sell Side Price Level Update message"
    msg_type = b'5'

In [9]:
#export
class SystemEventMsg(Msg):
    "Class for working with a System Event message"
    
    @classmethod
    def matches_msg_sub_type(cls, msg):
        return msg[1:2] == cls.msg_sub_type

In [10]:
class StartOfMessagesMsg(SystemEventMsg):
    "Class for working with Start of Messages message"
    msg_type = b'S'
    msg_sub_type = b'O'

In [11]:
class StartOfSystemHoursMsg(SystemEventMsg):
    "Class for working with Start of System Hours message"
    msg_type = b'S'
    msg_sub_type = b'S'

In [12]:
class StartOfRegularMarketHoursMsg(SystemEventMsg):
    "Class for working with Start of Regular Market Hours message"
    msg_type = b'S'
    msg_sub_type = b'R'

In [13]:
class EndOfRegularMarketHoursMsg(SystemEventMsg):
    "Class for working with End of Regular Market Hours message"
    msg_type = b'S'
    msg_sub_type = b'M'

In [14]:
class EndOfSystemHoursMsg(SystemEventMsg):
    "Class for working with End of System Hours message"
    msg_type = b'S'
    msg_sub_type = b'E'

In [15]:
class EndOfMessagesMsg(SystemEventMsg):
    "Class for working with End of Messages message"
    msg_type = b'S'
    msg_sub_type = b'C'

In [16]:
class ShortSalePriceTestStatusMsg(Msg):
    "Class for working with Short Sale Price Test Status message"
  
    @classmethod
    def matches_msg_sub_type(cls, msg):
        return unpack("<b", msg[1:2])[0] == cls.msg_sub_type
        
    def __init__(self, sequence_number, msg, debug=False):
        fmt = "<bq8sb"    
        
        self.status, self.timestamp, self.symbol, self.detail = \
            unpack_from(fmt, msg[1:])

        self.symbol = self.symbol.rstrip().decode('UTF-8')
                
        super().__init__(sequence_number, msg, debug=debug)

In [17]:
class ShortSalePriceTestNotInEffect(ShortSalePriceTestStatusMsg):
    "Class for working with Short Sale Price Test Not In Effect message"
    msg_type = b'P'
    msg_sub_type = 0

In [18]:
class ShortSalePriceTestInEffect(ShortSalePriceTestStatusMsg):
    "Class for working with Short Sale Price Test In Effect message"
    msg_type = b'P'
    msg_sub_type = 1

In [19]:
class OperationalHaltStatusMsg(Msg):
    "Class for working with Operational Halt Status message"
    
    @classmethod
    def matches_msg_sub_type(cls, msg):
        return unpack("<c", msg[1:2])[0] == cls.msg_sub_type

    
    def __init__(self, sequence_number, msg, debug=False):
        fmt = "<bq8s"    
        
        self.status, self.timestamp, self.symbol = \
            unpack_from(fmt, msg[1:])

        self.symbol = self.symbol.rstrip().decode('UTF-8')
            
        super().__init__(sequence_number, msg, debug=debug)

In [20]:
class IEXSpecificOperationalTradingHaltMsg(OperationalHaltStatusMsg):
    "Class for working with IEX Specific Operational Trading Halt message"
    msg_type = b'O'
    msg_sub_type = b'O'

In [21]:
class NotOperationallyHaltedOnIEXMsg(OperationalHaltStatusMsg):
    "Class for working with Not Operationally Halted On IEX message"
    msg_type = b'O'
    msg_sub_type = b'N'

In [22]:
class TradingStatusMsg(Msg):
    "Class for working with Trading Status message"

    @classmethod
    def matches_msg_sub_type(cls, msg):
        return unpack("<c", msg[1:2])[0] == cls.msg_sub_type
    
    def __init__(self, sequence_number, msg, debug=False):
        fmt = "<cq12s"    
        
        self.status, self.timestamp, s = \
            unpack_from(fmt, msg[1:])

        self.symbol, self.reason = s[0:7], s[8:11]
        
        self.symbol = self.symbol.rstrip().decode('UTF-8')
        self.reason = self.reason.rstrip().decode('UTF-8')

        super().__init__(sequence_number, msg, debug=debug)

In [23]:
class TradingHaltedAcrossAllUSEquityMarketsMsg(TradingStatusMsg):
    "Class for working with Trading Haldted Across All US Equity Markets message"
    msg_type = b'H'
    msg_sub_type = b'H'  

In [24]:
class TradingHaltReleasedIntoAnOrderAcceptancePeriodOnIEXMsg(TradingStatusMsg):
    "Class for working with Trading Halt Released Into An Order Acceptance Period On IEX message"
    msg_type = b'H'
    msg_sub_type = b'O'

In [25]:
class TradingPausedAndOrderAcceptancePeriodOnIEXMsg(TradingStatusMsg):
    "Class for working with TradingPausedAndOrderAcceptancePeriodOnIEX message"
    msg_type = b'H'
    msg_sub_type = b'P'

In [26]:
class TradingOnIEX(TradingStatusMsg):
    "Class for working with TradingOnIEX message"
    msg_type = b'H'
    msg_sub_type = b'T'

In [27]:
class SecurityDirectoryMsg(Msg):
    "Class for working with SecurityDirectory message"
    msg_type = b'D'
    
    def __init__(self, sequence_number, msg, debug=False):
        fmt = "<bq8siqb"    
        
        self.flags, self.timestamp, self.symbol, self.round_lot_size, self.adjusted_poc_price, self.luld_tier = \
            unpack_from(fmt, msg[1:])
        
        self.symbol = self.symbol.rstrip().decode('UTF-8')

        self.adjusted_poc_price = self.adjusted_poc_price / 10000
        
        super().__init__(sequence_number, msg, debug=debug)

In [28]:
#export
class UnsupportedMsg(Msg):
    "Class for catching unsupported messages"

In [29]:
#export
class Pkt():
    "Class for working with a DEEP packet"
    def __init__(self, pkt, debug=False):
        self.debug = debug
        if self.debug:
            self.bytes = pkt
        
        self.index = -1
        
        self.version     = b"\x01"
        self.reserved    = b"\x00"
        self.protocol_id = b"\x04\x80"        
        self.channel_id  = b"\x01\x00\x00\x00"
            
        header = (
            self.version + 
            self.reserved + 
            self.protocol_id + 
            self.channel_id
        )
    
        pkt = pkt[pkt.find(header) + len(header):]

        fmt = "<lhhqqq"
        
        self.session_id, self.payload_len, self.msg_count, self.stream_offset, self.first_msg_seq_num, self.send_time = \
            unpack_from(fmt, pkt)
                  
        self.msgs = pkt[calcsize(fmt):]        

    def next_msg(self):
        self.index += 1
        if self.index == self.msg_count:
            return None
        else:
            frag, msgs = self.msgs[0:2], self.msgs[2:]            
            msg_len = unpack("<h", frag)[0]
            msg, self.msgs = msgs[0:msg_len], msgs[msg_len:]
            return Msg.factory(self.first_msg_seq_num + self.index, msg, debug=self.debug)

In [30]:
#export 
class Deep():
    "Class for working with a DEEP export file"
    def __init__(self, path, debug=False):
        self._pkt = None
        self.pcap = pcap(path)
        self.debug = debug
        
    def __set_pkt(self, ts, _pkt):
        self._pkt = Pkt(_pkt, debug=self.debug) 
        
    def next_pkt(self):
        if self.pcap.dispatch(1, self.__set_pkt) == 1:
            return(self._pkt) 
        else:
            return None

In [31]:
deep = Deep('input\data_feeds_20210924_20210924_IEXTP1_DEEP1.0.pcap', debug=True)

In [32]:
n = 0
while pkt := deep.next_pkt():
    while msg := pkt.next_msg():
        if type(msg) == UnsupportedMsg:
            print(vars(msg))
            raise

{'sequence_number': 34016, 'type': b'T', 'debug': True, 'bytes': b'T@\x06\xd5\x97\x14G\xc0\xa7\x16CAPR    d\x00\x00\x00\xe0\xf6\x00\x00\x00\x00\x00\x00\xc0\xe8*\x00\x00\x00\x00\x00'}


RuntimeError: No active exception to reraise

In [None]:
deep = Deep('input\data_feeds_20210924_20210924_IEXTP1_DEEP1.0.pcap')

In [None]:
chunk_size = 1_000_000

In [None]:
def write_chunk(msgs):
    df = pd.DataFrame({
        'flags': [m.event_flags for m in msgs],
        'timestamp': [m.timestamp for m in msgs],
        'symbol': [m.symbol for m in msgs],
        'size': [m.size for m in msgs],
        'price': [m.price for m in msgs],
        'side': ['buy' if type(msg) == BuySidePriceLevelUpdageMsg else 'sell' for m in msgs]
    })
    table = pa.Table.from_pandas(df)
    pq.write_table(table, 'output\price_level_updates_{}.parquet'.format(msgs[0].timestamp))

In [None]:
i = 1
msgs = []
while pkt := deep.next_pkt():
    while msg := pkt.next_msg():
        if type(msg) != UnsupportedMsg:
            msgs.append(msg)                    
        if len(msgs) >= chunk_size:
            write_chunk(msgs)
            msgs = []
            i += 1
            print(".", end="")

# write remaining less than chunk_size left overs
write_chunk(msgs)
print(".")