In [1]:
#!/usr/bin/env python
# coding: utf-8

import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
import functools
import numpy as np

class PitchData():
    def __init__(self,raw_df):
        self.df = raw_df
    
    #Extract message_length and message_type
    def extract_raw_data(self):
        self.df['message_length']= self.df['raw_message_data'].str.len()
        self.df['message_type'] = self.df['raw_message_data'].str.slice(9,10,1).astype(str)
    
    #Create seperate dataframes for each message types
    def createMultipleDataFrames(self):
        active_df = self.df[self.df.message_type == 'A'].reset_index(drop=True)
        cancel_df = self.df[self.df.message_type == 'X'].reset_index(drop=True)
        executed_df = self.df[self.df.message_type == 'E'].reset_index(drop=True)
        trade_df = self.df[self.df.message_type == 'P'].reset_index(drop=True)
        
        return active_df,cancel_df,executed_df,trade_df


class AggregateData():
    def __init__(self,active_df,cancel_df,executed_df,trade_df):
        self.active_df = active_df
        self.cancel_df = cancel_df
        self.executed_df = executed_df
        self.trade_df = trade_df
        self.active_agg = None
        self.cancel_agg = None
        self.executed_agg= None
        self.trade_agg = None
        self.orders_agg = None
        self.volume_agg = None
    
    # Parse and process AddOrder, active messages based on the CBOE book format
    def processActiveOrders(self):
        active_df = self.active_df
        #Ignore the first character 'S'
        active_df['raw_message_data'] = active_df['raw_message_data'].str[1:]
        active_df['time_stamp'] = active_df['raw_message_data'].str.slice(0,8,1)#slice parameters can be given from message schemas dictionary as well
        active_df['order_ID'] = active_df['raw_message_data'].str.slice(9,21,1)
        active_df['side_indicator'] = active_df['raw_message_data'].str.slice(21,22,1)
        active_df['shares'] = active_df['raw_message_data'].str.slice(22,28,1)
        active_df['share_symbol'] = active_df['raw_message_data'].str.slice(28,34,1)
        active_df['price'] = active_df['raw_message_data'].str.slice(34,44,1)
        active_df['display'] = active_df['raw_message_data'].str.slice(44,45,1)
        active_df['shares'] = active_df['shares'].astype(int)
        return active_df
    
    # Parse and process Cancel Order messages based on the CBOE book format
    def processCancelOrders(self):
        cancel_df = self.cancel_df
        #Ignore the first character 'S'
        cancel_df['raw_message_data'] = cancel_df['raw_message_data'].str[1:]
        cancel_df['time_stamp'] = cancel_df['raw_message_data'].str.slice(0,8,1)
        cancel_df['order_ID'] = cancel_df['raw_message_data'].str.slice(9,21,1)
        cancel_df['cancelled_shares'] = cancel_df['raw_message_data'].str.slice(21,27,1)
        cancel_df['cancelled_shares'] = cancel_df['cancelled_shares'].astype(int)
        return cancel_df
        
    # Parse and process Execute Order messages based on the CBOE book format
    def processExecutedOrders(self):
        executed_df = self.executed_df
        #Ignore the first character 'S'
        executed_df['raw_message_data'] = executed_df['raw_message_data'].str[1:]
        executed_df['time_stamp'] = executed_df['raw_message_data'].str.slice(0,8,1)
        executed_df['order_ID'] = executed_df['raw_message_data'].str.slice(9,21,1)
        executed_df['executed_shares'] = executed_df['raw_message_data'].str.slice(21,27,1)
        executed_df['execution_ID'] = executed_df['raw_message_data'].str.slice(27,39,1)
        executed_df['executed_shares'] = executed_df['executed_shares'].astype(int)
        return executed_df
    
    # Parse and process Trade messages based on the CBOE book format
    def processTradeOrders(self):
        trade_df = self.trade_df
        #Ignore the first character 'S'
        trade_df['raw_message_data'] = trade_df['raw_message_data'].str[1:]
        trade_df['time_stamp'] = trade_df['raw_message_data'].str.slice(0,8,1)
        trade_df['order_ID'] = trade_df['raw_message_data'].str.slice(9,21,1)
        trade_df['side_indicator'] = trade_df['raw_message_data'].str.slice(21,22,1)
        trade_df['shares'] = trade_df['raw_message_data'].str.slice(22,28,1)
        trade_df['share_symbol'] = trade_df['raw_message_data'].str.slice(28,34,1)
        trade_df['price'] = trade_df['raw_message_data'].str.slice(34,44,1)
        trade_df['execution_id'] = trade_df['raw_message_data'].str.slice(44,56,1)
        trade_df['shares'] = trade_df['shares'].astype(int)
        return trade_df
    
    # Parent function to parse and create different dataframes for each message type
    def process_orders(self):
        self.active_df = self.processActiveOrders()
        self.cancel_df = self.processCancelOrders()
        self.executed_df = self.processExecutedOrders()
        self.trade_df = self.processTradeOrders()
    
    # Gets the sums of all the shares for a share_symbol which is grouped by order-ID
    def process_order_aggregates(self):
        self.active_agg = self.active_df.groupby(by =['order_ID','share_symbol']).agg({'shares':'sum'}).reset_index()
        self.cancel_agg = self.cancel_df.groupby(by =['order_ID']).agg({'cancelled_shares':'sum'}).reset_index()
        self.executed_agg = self.executed_df.groupby(by =['order_ID']).agg({'executed_shares':'sum'}).reset_index()
        self.trade_agg = self.trade_df.groupby(by =['order_ID','share_symbol']).agg({'shares':'sum'}).reset_index()
    
    # Gets the sums of all the shares for a share_symbol which is grouped by order-ID. Also generates column called total shares(sum of executed+traded shares)
    def combine_order_aggregates(self):
        data_frames = [self.active_agg,self.cancel_agg,self.executed_agg,self.trade_agg]
        self.orders_agg = functools.reduce(lambda  left,right: pd.merge(left,right,on=['order_ID'],
                                            how='outer'), data_frames).fillna(0)
        self.orders_agg.rename(columns = {'share_symbol_y':'TradeMessage_Share_Symbol','share_symbol_x':'ActiveMessage_Share_Symbol','shares_x':'active_shares','shares_y':'traded_shares'}, inplace = True)
        self.orders_agg['TradeMessage_Share_Symbol'] = self.orders_agg['TradeMessage_Share_Symbol'].replace([0],'NaN')
        self.orders_agg['ActiveMessage_Share_Symbol'] = self.orders_agg['ActiveMessage_Share_Symbol'].replace([0],'NaN')
        self.orders_agg['stock_symbol'] = np.where(self.orders_agg['ActiveMessage_Share_Symbol'] == 'NaN',self.orders_agg['TradeMessage_Share_Symbol'],self.orders_agg['ActiveMessage_Share_Symbol'])
        self.orders_agg['order_volume'] = self.orders_agg['executed_shares'] + self.orders_agg['traded_shares']
    
    # orders the sum of shares for each symbol in the file in descending order
    def process_volume_aggregate(self):
        self.volume_agg = self.orders_agg.groupby(by =['stock_symbol']).agg({'order_volume':'sum'}).sort_values(by='order_volume', ascending=False).reset_index()
    
    #gets the top-10 movers for the given file.
    def top_ten_movers(self):
        return self.volume_agg.head(10)
        
        

def main():
    #Read the file with Exception Handling
    filepath = r'pitch_example_data'
    
    #Can be taken as user input also
    #filepath = raw_input("Enter the absolute path of the PITCH file")
    try:
        raw_df = pd.read_csv(filepath, header=None,names = ['raw_message_data'])    
    except IOError:
        print(str(filepath) + " is not a valid file path" + " or file may not be existent")
    
    PitchDataObject = PitchData(raw_df)
    PitchDataObject.extract_raw_data()
    active_df,cancel_df,executed_df,trade_df = PitchDataObject.createMultipleDataFrames()
    AggregateDataObject = AggregateData(active_df,cancel_df,executed_df,trade_df)
    AggregateDataObject.process_orders()
    AggregateDataObject.process_order_aggregates()
    AggregateDataObject.combine_order_aggregates()
    AggregateDataObject.process_volume_aggregate()
    
    output = AggregateDataObject.top_ten_movers()
    print(output) 


if __name__ == "__main__":
    main()

  stock_symbol  order_volume
0       OIH           5000.0
1       SPY           2000.0
2       DRYS          1209.0
3       ZVZZT          577.0
4       AAPL           495.0
5       PTR            400.0
6       UYG            400.0
7       FXP            320.0
8       DIA            229.0
9       BAC            210.0


In [2]:
# Slicing based on schema - As an extension we can plugin the schema types and slice the dataframes based on defined schema
#  instead of wrting defined slice statements
#Define schema dictionary for different message types

# message_schemas = {
#     'A': {'timestamp': {'offset': 0,'length': 8},
#          'message_type': {'offset': 8,'length': 1},
#          'order_ID': {'offset': 9,'length': 12},
#          'side_indicator': {'offset':21,'length': 1},
#          'shares': {'offset':22,'length':6},
#          'stock_symbol': {'offset':28,'length':6},
#          'price': {'offset':34,'length':10},
#          'display': {'offset':44,'length':1}},
    
#     'E': {'timestamp': {'offset': 0,'length': 8},
#          'message_type': {'offset': 8,'length': 1},
#          'order_ID': {'offset': 9,'length': 12},
#          'executed_shares': {'offset':21,'length': 6},
#          'execution_id': {'offset':27,'length':12}},
    
#     'X':{'timestamp': {'offset': 0,'length': 8},
#          'message_type': {'offset': 8,'length': 1},
#          'order_ID': {'offset': 9,'length': 12},
#          'cancelled_shares': {'offset':21,'length': 6}},
    
#     'P':{'timestamp': {'offset': 0,'length': 8},
#          'message_type': {'offset': 8,'length': 1},
#          'order_ID': {'offset': 9,'length': 12},
#          'side_indicator': {'offset':21,'length': 1},
#          'shares': {'offset':22,'length':6},
#          'stock_symbol': {'offset':28,'length':6},
#          'price': {'offset':34,'length':10},
#          'execution_id': {'offset':44,'length':12}}}

# slice the frame based on offset and length
#     eg: cancel_df['time_stamp'] = cancel_df['raw_message_data'].str.slice(0,8,1) can be re-written as:
#         cancel_timestamp_offset = message_schemas['A']['timestamp']['offset']
#         cancel_timestamp_length = message_schemas['A']['timestamp']['length']
#         cancel_df['time_stamp'] = cancel_df['raw_message_data'].str.slice(cancel_timestamp_offset,cancel_timestamp_length,1)
