In [1]:
import pandas as pd

Let's see how big is the file. Can I load it in memory?

In [2]:
import os
print(f"{os.path.getsize('pretrade_current.txt')//1024**2} MiB")

102 MiB


102 MiB assuming it is not compressed (since it is a txt file) we can safely process the whole file all togheter.
Let's see how many lines...

In [3]:
with open('pretrade_current.txt', 'r') as f:
    total_len = len(f.readlines())
    print(total_len)
    

751320


Check the head to see the schema of a single line

In [4]:
with open('pretrade_current.txt', 'r') as f:
    head_file = [f.readline() for _ in range(10)]

In [5]:
print(head_file[0])

0 {{"msgType_":8,"length_":36,"seqNo_":280},"security_":{"securityId_":4749,"umtf_":"FUTRl","isin_":"GB00BYZN9041","currency_":"GBX","mic_":"XLON","tickTableId_":33,"flags_":"{"b_":{"closingEnabled_":1,"testStock_":0,"illiquid":0,"live_":1,"aodEnabled_":1},"v_":25}}}



Ok I can filter useful line from timestamps using a regex rule...

In [6]:
import re

# This works to extract all {} and remove noise
rule = re.compile('(?:\{(\{.*\}),(\".*\":\{.*\})\})')

In [7]:
rule.search(head_file[0]).group(0)

'{{"msgType_":8,"length_":36,"seqNo_":280},"security_":{"securityId_":4749,"umtf_":"FUTRl","isin_":"GB00BYZN9041","currency_":"GBX","mic_":"XLON","tickTableId_":33,"flags_":"{"b_":{"closingEnabled_":1,"testStock_":0,"illiquid":0,"live_":1,"aodEnabled_":1},"v_":25}}}'

In [8]:
print(rule.search('1646824440194545000 (2022-03-09T11:14:00.194545) 14'))

None


In [None]:
Maybe it would have been easier to match the time stamp... nevermind...

In [9]:
with open('pretrade_current.txt', 'r') as f:
    group1 = []
    group2 = []
    for line in f:
        match = rule.search(line)
        if match:
            group1.append(match.group(1)) # Group 1 is the first {...} element with msgType
             group2.append(match.group(2)) # Group 2 is the rest "security", "book..."

In [10]:
len(group1) == len(group2)

True

We may need to consider only two types of lines, type=8 and type=12, let's continue to play with regex

In [11]:
stringa = '''{{"msgType_":8,"length_":36,"seqNo_":280},"security_":{"securityId_":4749,"umtf_":"FUTRl","isin_":"GB00BYZN9041","currency_":"GBX","mic_":"XLON","tickTableId_":33,"flags_":"{"b_":{"closingEnabled_":1,"testStock_":0,"illiquid":0,"live_":1,"aodEnabled_":1},"v_":25}}}'''

rule_type8 = re.compile('"msgType_":8')
rule_type12 = re.compile('"msgType_":12')
rule_typeX = re.compile('"msgType_":\\d')

a = rule_typeX.search(stringa)
a.string

'{{"msgType_":8,"length_":36,"seqNo_":280},"security_":{"securityId_":4749,"umtf_":"FUTRl","isin_":"GB00BYZN9041","currency_":"GBX","mic_":"XLON","tickTableId_":33,"flags_":"{"b_":{"closingEnabled_":1,"testStock_":0,"illiquid":0,"live_":1,"aodEnabled_":1},"v_":25}}}'

Ok new regex based filtering, will pick up all different type of msgType assuming they will be useful in a possible future

In [12]:
filter_rule = re.compile('"msgType_":(\\d+)')

with open('pretrade_current.txt', 'r') as f:
    groups = {}
    rest = []
    for line in f:
        line_match = filter_rule.search(line)
        if line_match:
            group_match = line_match.group(1)

            if group_match not in groups.keys():
                groups[group_match] = []

            groups[group_match].append(line_match.string)
        else :
            rest.append(line)


print(groups.keys())

dict_keys(['8', '10', '11', '12'])


In [13]:
sum = 0

for key in groups.keys():
    count = len(groups[key])
    print(count)
    sum += count

print(f"{sum=}")
print(f"{total_len-sum=}")
# print(f"{len(rest)=}")

1693
39
69186
606775
sum=677693
total_len-sum=73627


In [22]:
import random
random.choices(rest[:10], k=20)

['1646812740000460000 (2022-03-09T07:59:00.000460) 1\n',
 '1646812740000541000 (2022-03-09T07:59:00.000541) 1\n',
 '1646812740000509000 (2022-03-09T07:59:00.000509) 1\n',
 '1646812740000509000 (2022-03-09T07:59:00.000509) 1\n',
 '1646812740000509000 (2022-03-09T07:59:00.000509) 1\n',
 '1646812740000485000 (2022-03-09T07:59:00.000485) 1\n',
 '1646812740000460000 (2022-03-09T07:59:00.000460) 1\n',
 '1646812740000383000 (2022-03-09T07:59:00.000383) 1\n',
 '1646812740000431000 (2022-03-09T07:59:00.000431) 1\n',
 '1646812740000408000 (2022-03-09T07:59:00.000408) 1\n',
 '1646812740000460000 (2022-03-09T07:59:00.000460) 1\n',
 '1646812740000509000 (2022-03-09T07:59:00.000509) 1\n',
 '1646812740000509000 (2022-03-09T07:59:00.000509) 1\n',
 '1646812740000485000 (2022-03-09T07:59:00.000485) 1\n',
 '1646812740000352000 (2022-03-09T07:59:00.000352) 1\n',
 '1646812740000352000 (2022-03-09T07:59:00.000352) 1\n',
 '1646812740000282000 (2022-03-09T07:59:00.000282) 2\n',
 '1646812740000541000 (2022-03-

Ok now let's parse the useful messages...

In [23]:
list8 = [re.search('(?:\"\\w+\")\\:(\\{.*\\})\\}$', entry).group(1) for entry in groups['8']]
list12 = [re.search('(?:\"\\w+\")\\:(\\{.*\\})\\}$', entry).group(1) for entry in groups['12']]

In [24]:
print(list8[0])
# eval(list8[9])
ast.literal_eval(list8[9])

{"securityId_":4749,"umtf_":"FUTRl","isin_":"GB00BYZN9041","currency_":"GBX","mic_":"XLON","tickTableId_":33,"flags_":"{"b_":{"closingEnabled_":1,"testStock_":0,"illiquid":0,"live_":1,"aodEnabled_":1},"v_":25}}


NameError: name 'ast' is not defined

In [None]:
print(list12[9])
# eval(list12[9])
ast.literal_eval(list12[9])

NameError: name 'list12' is not defined

No luck with literal evaluation need to parse by hand

Sadly I cannot parse directly the messages into dictionaries or JSON (they appear to me to not be correctly formatted for this)

Ok will parse directly the useful entities for type 8 and 12, this could be problematic if I had more types.

In [None]:
# {"securityId_":4749,"umtf_":"FUTRl","isin_":"GB00BYZN9041","currency_":"GBX","mic_":"XLON","tickTableId_":33,"flags_":"{"b_":{"closingEnabled_":1,"testStock_":0,"illiquid":0,"live_":1,"aodEnabled_":1},"v_":25}}

parse_string8 = (
    '\"securityId_\"\\:(\\d+),' #1
    '\"umtf_\"\\:\"(\\w+)\",' #2
    '\"isin_\"\\:\"(\\w+)\",' #3
    '\"currency_\"\\:\"(\\w+)\",' #4
    '\"mic_\"\\:\"(\\w+)\",' #5
    '\"tickTableId_\"\\:(\\d+),' #6
)

# {"securityId_":2,"side_":SELL,"quantity_":390,"price_":119350000,"orderId_":443445}
parse_string12 = (
    '\"securityId_\"\\:(\\d+),' #1
    '\"side_\"\\:(\\w+),' #2
    '\"quantity_\"\\:(\\d+),' #3
    '\"price_\"\\:(\\d+),' #4
    '\"orderId_\"\\:(\\d+)' #5
    # '\\}'
)

In [26]:
print(list8[9])
print(re.search(parse_string8, list8[9]))
for i in range(1, 7):
    print(re.search(parse_string8, list8[9]).group(i))

{"securityId_":3466,"umtf_":"ITPp","isin_":"FR0004024222","currency_":"EUR","mic_":"XPAR","tickTableId_":32,"flags_":"{"b_":{"closingEnabled_":0,"testStock_":0,"illiquid":0,"live_":1,"aodEnabled_":0},"v_":8}}
<re.Match object; span=(1, 108), match='"securityId_":3466,"umtf_":"ITPp","isin_":"FR0004>
3466
ITPp
FR0004024222
EUR
XPAR
32


In [27]:


list_dict8 = []
for elements in list8:
    match = re.search(parse_string8, elements)
    if match:
        tmp_dict = {
            'securityId': match.group(1),
            'umtf': match.group(2),
            'isin': match.group(3),
            'currency': match.group(4),
            'mic': match.group(5),
            'tickTableId': match.group(6),
        }
        list_dict8.append(tmp_dict)

# print(list_dict8)
df8 = pd.DataFrame(list_dict8)  
df8.head()


Unnamed: 0,securityId,umtf,isin,currency,mic,tickTableId
0,4749,FUTRl,GB00BYZN9041,GBX,XLON,33
1,4491,BVCl,IL0010849045,GBX,XLON,32
2,4257,AVONl,GB0000667013,GBX,XLON,33
3,3473,TOKMAh,FI4000197934,EUR,XHEL,33
4,3472,UIEc,BSP951331318,DKK,XCSE,31


In [28]:
print(list12[9])
print(parse_string12)
print(re.search(parse_string12, list12[22]))
for i in range(1,6):
    print(re.search(parse_string12, list12[9]).group(i))

{"securityId_":2,"side_":SELL,"quantity_":390,"price_":119350000,"orderId_":443445}
"securityId_"\:(\d+),"side_"\:(\w+),"quantity_"\:(\d+),"price_"\:(\d+),"orderId_"\:(\d+)
<re.Match object; span=(1, 81), match='"securityId_":13,"side_":SELL,"quantity_":750,"pr>
2
SELL
390
119350000
443445


In [83]:
list_dict12 = []
for elements in list12:
    match = re.search(parse_string12, elements)
    if match:
        tmp_dict = {
            'securityId': match.group(1),
            'side': match.group(2),
            'quantity': match.group(3),
            'price': match.group(4),
            'orderId': match.group(5),
        }
        list_dict12.append(tmp_dict)

# print(list_dict12)
df12 = pd.DataFrame(list_dict12)  
df12.head()


Unnamed: 0,securityId,side,quantity,price,orderId
0,2,BUY,437,116950000,480351
1,2,BUY,500,117000000,222046
2,2,BUY,396,117450000,432574
3,2,BUY,446,117850000,40
4,2,BUY,500,118050000,447953


Let's start to process it

In [112]:
df12_typed = df12.astype({
    'securityId': 'Int64',
    'side': 'category',
    'quantity': 'Int64',
    'price': 'Int64',
    'orderId': 'Int64',
})

In [171]:
df12_unique = df12_typed.drop_duplicates(keep='last')

I decided to keep the latest one of the duplicates. Assuming there have been corrections in the process generating this messages, the last one should be the correct one.

Will start to generate the aggregate data using groupby function on identityId which is shared with type8 and side which is what has been requested by the assignment

In [185]:
df12_sum = (
    df12_unique
    .groupby(['securityId','side'])
    .sum()
    .loc[:,'quantity']
    .unstack(level=-1)
    .rename({'BUY':'Total Buy Quantity', 'SELL':'Total Sell Quantity'}, axis=1)
)
df12_sum

side,Total Buy Quantity,Total Sell Quantity
securityId,Unnamed: 1_level_1,Unnamed: 2_level_1
2,219426,243127
8,200,0
13,227077,232121
22,0,10
29,74594,72795
...,...,...
3460,1432,1074
3461,2954,9284
4257,17557,19042
4491,306556,324794


In [149]:
df12_count = (
    df12_unique
    .groupby(['securityId', 'side'])
    .count()
    .loc[:, 'quantity']
    .unstack(level=-1)
    .rename({'BUY':'Total Buy Count', 'SELL':'Total Sell Count'}, axis=1)
)
df12_count

side,Total Buy Count,Total Sell Count
securityId,Unnamed: 1_level_1,Unnamed: 2_level_1
2,872,950
8,2,0
13,770,776
22,0,1
29,1601,1540
...,...,...
3460,4,3
3461,7,22
4257,83,94
4491,74,75


In [151]:
df12_unique['weighted_price'] = (df12_unique['price']/df12_unique['quantity'])
df12_weighted = (
    df12_unique
    .groupby(['securityId', 'side'])
    .mean()
    .loc[:, 'weighted_price']
    .unstack()
    .rename({'BUY':'Weighted Average Buy Price', 'SELL':'Weighted Average Sell Price'}, axis=1)
)
df12_weighted

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df12_unique['weighted_price'] = (df12_unique['price']/df12_unique['quantity'])


side,Weighted Average Buy Price,Weighted Average Sell Price
securityId,Unnamed: 1_level_1,Unnamed: 2_level_1
2,695115.758248,558870.753888
8,1534.0,
13,23813.483875,15278.386506
22,,95800.0
29,23909402.611458,24192437.305632
...,...,...
3460,7381.98324,7402.234637
3461,5315.504401,5326.217148
4257,12286661.961952,1860744.358011
4491,2607.349215,2686.171372


In [210]:
df12_max = (
    df12_unique
    .groupby(['securityId', 'side'])
    .max()
    .loc[:, 'price']
    .unstack(level=-1)
    .loc[:, 'BUY']
    .rename("Max Buy Price")
)
df12_max

securityId
2       122350000
8          153400
13        3072000
22           <NA>
29      870600000
          ...    
3460      2652000
3461      2250900
4257    122300000
4491      4770000
4749    249200000
Name: Max Buy Price, Length: 860, dtype: Int64

In [211]:
df12_min = (
    df12_unique
    .groupby(['securityId', 'side'])
    .min()
    .loc[:, 'price']
    .unstack(level=-1)
    .loc[:, 'SELL']
    .rename("Min Sell Price")
)
df12_min

securityId
2       118000000
8            <NA>
13        2994000
22         958000
29      805600000
          ...    
3460      2638500
3461      2240600
4257    119000000
4491      4445000
4749    236400000
Name: Min Sell Price, Length: 860, dtype: Int64

In [212]:
df12_concat = pd.concat([df12_count, df12_sum, df12_weighted, df12_max, df12_min], axis=1)

In [213]:
df12_concat

Unnamed: 0_level_0,Total Buy Count,Total Sell Count,Total Buy Quantity,Total Sell Quantity,Weighted Average Buy Price,Weighted Average Sell Price,Max Buy Price,Min Sell Price
securityId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2,872,950,219426,243127,695115.758248,558870.753888,122350000,118000000
8,2,0,200,0,1534.0,,153400,
13,770,776,227077,232121,23813.483875,15278.386506,3072000,2994000
22,0,1,0,10,,95800.0,,958000
29,1601,1540,74594,72795,23909402.611458,24192437.305632,870600000,805600000
...,...,...,...,...,...,...,...,...
3460,4,3,1432,1074,7381.98324,7402.234637,2652000,2638500
3461,7,22,2954,9284,5315.504401,5326.217148,2250900,2240600
4257,83,94,17557,19042,12286661.961952,1860744.358011,122300000,119000000
4491,74,75,306556,324794,2607.349215,2686.171372,4770000,4445000


Nice now I just need to join it with type8 dataframe

In [226]:
df8_typed = (df8
.drop(['umtf', 'mic', 'tickTableId'], axis=1)
.astype({
    'securityId': 'Int64',
    'isin': 'string',
    'currency': 'string'
})
)

In [242]:
df8_indexed = (
    df8_typed
    .set_index('securityId')
    .sort_index()
    .rename({
        'isin': 'ISIN',
        'currency': 'Currency'
    }, axis = 1 )
)

In [235]:
len(set(df12_concat.index).intersection(set(df8_indexed.index)))

792

In [243]:
final_df = pd.concat([df8_indexed,df12_concat], join='inner', axis=1)

In [244]:
final_df

Unnamed: 0_level_0,ISIN,Currency,Total Buy Count,Total Sell Count,Total Buy Quantity,Total Sell Quantity,Weighted Average Buy Price,Weighted Average Sell Price,Max Buy Price,Min Sell Price
securityId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2,GB00B1YW4409,GBX,872,950,219426,243127,695115.758248,558870.753888,122350000,118000000
8,IT0001233417,EUR,2,0,200,0,1534.0,,153400,
13,CH0012221716,CHF,770,776,227077,232121,23813.483875,15278.386506,3072000,2994000
22,ES0132105018,EUR,0,1,0,10,,95800.0,,958000
29,IE00BWT6H894,GBX,1601,1540,74594,72795,23909402.611458,24192437.305632,870600000,805600000
...,...,...,...,...,...,...,...,...,...,...
3453,LU0252633754,EUR,28,28,19600,19436,17958.061224,18231.624277,12858000,12416000
3461,FR0010361683,EUR,7,22,2954,9284,5315.504401,5326.217148,2250900,2240600
4257,GB0000667013,GBX,83,94,17557,19042,12286661.961952,1860744.358011,122300000,119000000
4491,IL0010849045,GBX,74,75,306556,324794,2607.349215,2686.171372,4770000,4445000


In case I need the exact same header as the email

In [246]:
" | ".join(final_df.columns)

'ISIN | Currency | Total Buy Count | Total Sell Count | Total Buy Quantity | Total Sell Quantity | Weighted Average Buy Price | Weighted Average Sell Price | Max Buy Price | Min Sell Price'

In [249]:
final_df.to_csv("aggregate_notebook.tsv", sep='\t', na_rep='NA', index=False)

Ok now to clean up, use proper naming and proper formatting -> main.py and ausiliary files