In [1]:
import sys
from pathlib import Path

class CONFIG:
    rootdir = Path.cwd().parent
    specsdir = rootdir / 'structs' / 'specs'


Making 1 named contant for each letter of alphabet bc time

In [2]:
import re
import parse

LETTERS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

def make_letter_validator(letter, msgcls=None):

    @parse.with_pattern(f'[{letter}]')
    def letter_check(text):
        assert len(text) == 1
        assert text == letter
        return text 
    
    return f"msgtype_{letter}", letter_check

message_indicators = {}

for letter in LETTERS:
    key, check = make_letter_validator(letter)
    message_indicators[key] = check


A very lightweight parser that uses python formatting language. Pretty close to the smallest/simplest this can get while maintaining names/padding characters/etc.

In [3]:
AddOrderShortParser = parse.Parser(
            ("{timestamp:>08}"
             "{message_type:msgtype_A}"
             "{order_id:12}"
             "{side_indicator:1}"
             "{shares:>06}"
             "{stock_symbol:6}"
             "{price:010}"
             "{display:1}"), message_indicators)


In [4]:
MESSAGE = "28800011AAK27GA0000DTS000100SH    0000619200Y"

In [5]:
AddOrderShortParser.parse(MESSAGE, evaluate_result=True).named

{'timestamp': '28800011',
 'message_type': 'A',
 'order_id': 'AK27GA0000DT',
 'side_indicator': 'S',
 'shares': '000100',
 'stock_symbol': 'SH    ',
 'price': '0000619200',
 'display': 'Y'}

Also generates regex patterns, might have some use browser-side

In [6]:
AddOrderShortParser._match_re.pattern

'^ *(?P<timestamp>.{8,}?)(?P<message_type>[A]) *(?P<order_id>.{12,}?) *(?P<side_indicator>.{1,}?) *(?P<shares>.{6,}?) *(?P<stock_symbol>.{6,}?) *(?P<price>.{10,}?) *(?P<display>.{1,}?)$'

In [7]:
AddOrderShortParser.parse(MESSAGE, evaluate_result=True)

<Result () {'timestamp': '28800011', 'message_type': 'A', 'order_id': 'AK27GA0000DT', 'side_indicator': 'S', 'shares': '000100', 'stock_symbol': 'SH    ', 'price': '0000619200', 'display': 'Y'}>

In [8]:
AddOrderShortParser._format

'{timestamp:>08}{message_type:msgtype_A}{order_id:12}{side_indicator:1}{shares:>06}{stock_symbol:6}{price:010}{display:1}'

### Make [pydantic](https://pydantic-docs.helpmanual.io/) models out of them

Up to here, none of the parsers have concerned themselves with validation or even the content itself.
`MessageType` is the only slot whose content is checked, the rest could be gibberish so long as
they have the right number of chars. It's assumed that those parsers will be used for internal,
fast/compact data transfer that gets checked somewhere else up the stream.


These are up the stream, used for data that should be content-checked. This is public sdk, available to third party devs.
- Generated using the same public specs as parsers. These do content validation, not solely positional validation.
- Available to devs who want OOP validation (vs "#/ref...")
- The models/fields/types all have type annotations on them, so it'll work with static type checkers


In [9]:
from enum import Enum

def oneof(name=None, attrs=None, **kwargs):
    if attrs is None:
        # use default name, attrs becomes first argument
        name, attrs = 'enum', name
    
    if kwargs:
        attrs = kwargs
            
    if isinstance(attrs, dict):
        attrs = {v: k for k,v in attrs.items()}

    elif isinstance(attrs, (set, list, tuple)):
        attrs = {v: v for v in attrs}

    return Enum(name, attrs)

In [10]:
def constant(val, **kwargs):
    # Camelcase because these return constructed classes
    return constr(regex=f"^{val}$", **kwargs)

from boltons.strutils import iter_splitlines

import re

re_meta = re.compile(r"^[^=\w]*\b(\w+)\s*=\s*(.*)$")

def parse_flags_from_string(value):
    for l in iter_splitlines(value):
        m = re_meta.match(l)
        if m:
            yield m.groups()

class MissingFlags(Exception):
    pass
            
def parse_flags_enum(name, value, **kw):
    vals = list(parse_flags_from_string(value))
    
    if len(vals) <= 1:
        # Skip some cases (like with Y as true, empty space as false)
        raise MissingFlags("Not enough flags found to work properly")
    
    vals = {k: v for k,v in vals}
    return oneof(f"{to_class_name(name)}", vals)

In [11]:
import re
from boltons.strutils import slugify, under2camel, iter_splitlines
from pydantic import constr, create_model, Schema, BaseModel
from pydantic.utils import lenient_issubclass
from enum import Enum
from cboe.pitch import validators

try:
    from typing import _TypingBase as typing_base  # type: ignore
except ImportError:
    from typing import _Final as typing_base  # type: ignore


def to_class_name(text: str) -> str:
    return under2camel(slugify(text))


PrintableAscii = constr(regex=validators.PRINTABLE_RE)
Alpha = constr(regex=validators.ALPHA_RE)
Numeric = constr(regex=validators.NUMERIC_RE)
Base36Numeric = constr(regex=validators.BASE36_NUMERIC_RE)
Price = constr(regex=validators.PRICE_RE)
Timestamp = constr(regex=validators.TIMESTAMP_RE)

pitch_type_map = {
    'Timestamp': Timestamp,
    'Base 36 Numeric': Base36Numeric,
    'Alpha': Alpha,
    'Price': Price,
    'Printable ASCII': PrintableAscii,
    'Numeric': Numeric
}

def is_msgtype(v):
    vt = _get_type(v)
    return lenient_issubclass(vt, MsgType) or isinstance(vt, MsgType) or (vt is MsgType)

def _get_type(v):
    if isinstance(v, typing_base) or isinstance(v, type):
        return v
    return type(v)
    
        
def get_pitch_native_type(spec):
    dtype = spec['Data Type']
    name = spec['Field Name']
    
    if dtype not in pitch_type_map:
        raise Exception(f"Invalid data type: {dtype} - {spec}")
    
    pytype = pitch_type_map.get(dtype)

    if spec['Length'] == 1:
        try:
            # shitty hack to parse descriptions into enums
            return parse_flags_enum(name, spec['Description'])
        except MissingFlags as e:
            # return as regular type, don't have enough to safely parse flags
            return pytype

    return pytype

def get_slice(val: str, offset: int, length: int):
    return val[offset:(offset+length)]


def get_line_slice(line, field):
    extra = field.schema.extra
    parsed = get_slice(line, offset=extra['offset'], length=extra['length'])
    return parsed

def trim_starting_s(val):
    #if len(val) == (model.total_width() + 1) and val.startswith('S'):
    if val.startswith('S'):
        val = val[1:]
    return val

def ensure_list(v):
    return v if isinstance(v, list) else list(v)






    

In [12]:
from cboe.pitch import utils

class PitchMessage(BaseModel):

    @property
    def message_type(self):
        # TODO: map all fields to snake case
        return getattr(self, "Message Type")

    @classmethod
    def total_width(cls):
        return sum([f.schema.extra["length"] for f in cls.__fields__.values()])

    @classmethod
    def sorted_fields(cls):
        data = [(k, v) for k, v in cls.__fields__.items()]
        data.sort(key=lambda x: x[1].schema.extra["offset"])
        return data

    @classmethod
    def parse_line(cls, line: str):
        # Assumes the line has all the proper data 
        data = {k: get_line_slice(line, v) for k, v in cls.sorted_fields()}
        return cls.parse_obj(data)

    @classmethod
    def format_str(cls):
        fmt_str = ""
        for k, v in cls.sorted_fields():
            if k == "Message Type":
                fmt_str += "{message_type:msgtype_%s}" % v.schema.extra['const']
                continue
            fmt_str += utils.make_named_fmt_str(slugify(k), width=v.schema.extra['length'])
        return fmt_str

    @classmethod
    def pattern(cls):
        return cls.get_parser()._match_re.pattern

    @classmethod
    def get_parser(cls):
        from parse import Parser
        return Parser(cls.format_str(), message_indicators)

    @classmethod
    def parse(cls, val: str):
        """Parsed given string as PITCH Message instance.

        TODO: Handle parse errors
        """
        val = trim_starting_s(cls, val)
        parser = cls.get_parser()
        data = parser.parse(val)
        if data:
            return cls(**data.named)
    
    

MESSAGE_TYPES = {}

def make_pitch_model(data):
    fields = {}
    msgtype = None

    
    for spec in data['Fields']:
        
        kwargs = dict(description=spec['Description'],
           length=spec['Length'], offset=spec['Offset'],
           title=spec['Field Name'],
           dtype=spec['Data Type'])
        
        name = spec['Field Name']
        if name == 'Message Type':
            msgtype = spec['Data Type']
            kwargs['const'] = msgtype
            pytype = constant(msgtype)
        else:
            pytype = get_pitch_native_type(spec)
            
        field = Schema(..., **kwargs)
        fields[name] = (pytype, field)

    config = {
        'title': data['Name'],
        'section': data['Section'],
        'description': data['Description'],
        'use_enum_values': True
    }
    config_cls = type('Config', tuple(), config)
    subtype_cls = type('SubBase', (PitchMessage,), {'Config': config_cls})
    clsname = to_class_name(data['Name']) + 'MessageModel'
    model = create_model(clsname, __base__=subtype_cls,  **fields)
    model.__doc__ = data['Description']
    
    MESSAGE_TYPES[msgtype] = model
    
    return model





##### Same as last time, load the spec Yamls and use them to generate new code

In [13]:
from pathlib import Path
from cboe.pitch.specs import load_specs_in

list(load_specs_in(CONFIG.specsdir, make_pitch_model))

[TradeBreakMessageModel,
 OrderCancelMessageModel,
 AuctionSummaryMessageModel,
 TradingStatusMessageModel,
 SymbolClearMessageModel,
 AddOrderShortMessageModel,
 AuctionUpdateMessageModel,
 TradeShortMessageModel,
 RetailPriceImprovementMessageModel,
 TradeLongMessageModel,
 OrderExecutedMessageModel,
 AddOrderLongMessageModel]

In [14]:
MESSAGE_TYPES['A'].parse_line(MESSAGE)

<AddOrderShortMessageModel Timestamp='28800011' Message Type='A' Order ID='AK27GA0000DT' Side Indicator='S' Shares='000100' Stock Symbol='SH    ' Price='0000619200' Display='Y'>

The models are all annotated, and can build json/openapi schema with expected data type validation

In [15]:
MESSAGE_TYPES['X'].sorted_fields()

[('Timestamp', <Field(Timestamp type=ConstrainedStrValue required)>),
 ('Message Type', <Field(Message Type type=ConstrainedStrValue required)>),
 ('Order ID', <Field(Order ID type=ConstrainedStrValue required)>),
 ('Canceled Shares',
  <Field(Canceled Shares type=ConstrainedStrValue required)>)]

In [16]:
MESSAGE_TYPES['A'].pattern()

'^ *(?P<timestamp>.{8,}?)(?P<message_type>[A]) *(?P<order_id>.{12,}?) *(?P<side_indicator>.{1,}?) *(?P<shares>.{6,}?) *(?P<stock_symbol>.{6,}?) *(?P<price>.{10,}?) *(?P<display>.{1,}?)$'

In [17]:
MESSAGE_TYPES['X'].schema()

{'title': 'Order Cancel',
 'description': 'Order Cancel messages are sent when a visible order on the Cboe book is canceled in whole or in part.\nNOTE:  Order Modification messages (4.4.x) refer to an Order ID previously sent with an Add Order message. Multiple Order Modification messages may modify a single order and the effects are cumulative. Order Modification messages always reduce the remaining shares in the referenced open order by the number of shares indicated. When the remaining shares for an order reach zero, the order is dead and should be removed from the book.',
 'type': 'object',
 'properties': {'Timestamp': {'title': 'Timestamp',
   'description': 'Timestamp',
   'length': 8,
   'offset': 0,
   'dtype': 'Timestamp',
   'pattern': '^[0-9]{8}$',
   'type': 'string'},
  'Message Type': {'title': 'Message Type',
   'description': 'Order Cancel message',
   'length': 1,
   'offset': 8,
   'dtype': 'X',
   'const': 'X',
   'pattern': '^X$',
   'type': 'string'},
  'Order ID':

##### Test functions to parse the example data file into the models

This can also obviously be achieved using the Kaitai classes. Doing it here to test  correctness, both sets of models come from the same specs, and the output should be interchangeable

In [18]:
from boltons.strutils import iter_splitlines
from boltons.iterutils import bucketize

#
# Helpers to kickstart message/line parsing
#

def parse_line(val: str):
    value = trim_starting_s(val)
    msgtypecode = value[8]
    try:
        return MESSAGE_TYPES[msgtypecode].parse_line(value)
    except KeyError as e:
        raise KeyError(f"No Message type found for: {msgtypecode}") from e


def parse_lines(lines):
    for line in filter(str.strip, lines):  # Filter out empty lines
        yield parse_line(line)
        
def parse_text(text):
    message_lines = iter_splitlines(text)
    yield from parse_lines(message_lines)

def group_messages(messages):
    return bucketize(messages, lambda x: x.message_type)

In [19]:
RAW_DATA = Path("/tmp/data_med").resolve().read_text()

In [20]:
message_parser = parse_text(RAW_DATA)

In [21]:
messages = list(message_parser)

In [22]:
messages[0:5]

[<AddOrderShortMessageModel Timestamp='28800011' Message Type='A' Order ID='AK27GA0000DT' Side Indicator='S' Shares='000100' Stock Symbol='SH    ' Price='0000619200' Display='Y'>,
 <AddOrderShortMessageModel Timestamp='28800012' Message Type='A' Order ID='BK27GA00000K' Side Indicator='B' Shares='001000' Stock Symbol='SSO   ' Price='0000763800' Display='Y'>,
 <AddOrderShortMessageModel Timestamp='28800012' Message Type='A' Order ID='BK27GA00000L' Side Indicator='B' Shares='001000' Stock Symbol='SSO   ' Price='0000763600' Display='Y'>,
 <AddOrderShortMessageModel Timestamp='28800012' Message Type='A' Order ID='BK27GA00000M' Side Indicator='S' Shares='001000' Stock Symbol='SSO   ' Price='0000764800' Display='Y'>,
 <AddOrderShortMessageModel Timestamp='28800012' Message Type='A' Order ID='AK27GA0000DU' Side Indicator='S' Shares='001000' Stock Symbol='SDS   ' Price='0000549300' Display='Y'>]

In [23]:
[x for x in messages if x.message_type == 'X']

[<OrderCancelMessageModel Timestamp='28800168' Message Type='X' Order ID='1K27GA00000Y' Canceled Shares='000100'>,
 <OrderCancelMessageModel Timestamp='28800168' Message Type='X' Order ID='1K27GA00000V' Canceled Shares='000100'>,
 <OrderCancelMessageModel Timestamp='28800168' Message Type='X' Order ID='1K27GA00000W' Canceled Shares='000100'>,
 <OrderCancelMessageModel Timestamp='28800174' Message Type='X' Order ID='5K27GA00000K' Canceled Shares='000100'>,
 <OrderCancelMessageModel Timestamp='28800179' Message Type='X' Order ID='5K27GA00000J' Canceled Shares='000100'>,
 <OrderCancelMessageModel Timestamp='28800180' Message Type='X' Order ID='1K27GA00000V' Canceled Shares='000100'>,
 <OrderCancelMessageModel Timestamp='28800180' Message Type='X' Order ID='1K27GA00000Y' Canceled Shares='000100'>,
 <OrderCancelMessageModel Timestamp='28800180' Message Type='X' Order ID='1K27GA00000X' Canceled Shares='000100'>,
 <OrderCancelMessageModel Timestamp='28800180' Message Type='X' Order ID='AK27GA