In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import array, col, explode, lit, coalesce, isnull, countDistinct, sumDistinct, to_timestamp, to_date, substring, concat, year, format_number, max, count, when, min, month, dayofmonth, udf, first, last, row_number, lag, sum, mean, datediff, to_date, lit, avg, substring, dense_rank, rank


from pyspark.sql.types import StringType, LongType, DateType, IntegerType
import pandas as pd
import numpy as np
import datetime

In [0]:
trace_to_be_filtered = spark.sql("select * from maximilian.trace_to_be_filtered")

# Filter Step 

Remove transaction records:
- canceled and adjust records that are subsequently corrected or reversed.

### Post 2012/02/06 Data

Following the Dick-Nielsen Approach.

In [0]:
#Transform Trade Report Date to DateType

#cast to String
step_8_1 = trace_to_be_filtered.withColumn('r_t_trd_rpt_dt',col('t_trd_rpt_dt').cast(StringType())) \

#cast to Date
step_8_1 = step_8_1.withColumn('r_t_trd_rpt_dt',to_date(col('r_t_trd_rpt_dt'), 'yyyyMMdd')) 

#Year, Month and DayofMonth Col
step_8_1 = step_8_1.withColumn('r_t_trd_rpt_dt_year',year('r_t_trd_rpt_dt')) \
                   .withColumn('r_t_trd_rpt_dt_month',month('r_t_trd_rpt_dt')) \
                   .withColumn('r_t_trd_rpt_dt_day',dayofmonth('r_t_trd_rpt_dt')) 


before_clean_count = step_8_1.count()

In [0]:
step_8_1.groupBy('r_t_trd_rpt_dt_year').agg(count(col('t_cusip_id'))).show()

In [0]:
#filtering the post 2012 date and removing observations with no cusip identifier

step_8_2 = step_8_1.filter(
                          (col('t_cusip_id').isNotNull() == True)
                          & (col('r_t_trd_rpt_dt') >= lit('2012-02-06'))
                          )

post_starting_obs = step_8_2.count()

print('starting observations: {:,}'.format(post_starting_obs))

In [0]:
#overview over the trade stati
step_8_2.groupBy('t_trc_st').agg(count(col('t_trc_st'))).show()
#T = Trade Report
#C = Trade Cancellation
#W = Trade Correction
#X = Trade Cancellation
#Y = Trade Reversals
#R = Trade Reversals

In [0]:
#overview over the as of field values
step_8_2.groupBy('t_asof_cd').agg(count('t_asof_cd')).show()
#R = Reversals
#A = As of (reported late)

In [0]:
post_TR = step_8_2.filter((col('t_trc_st') == 'T') | (col('t_trc_st') == 'R')) #normal trades
post_XC = step_8_2.filter((col('t_trc_st') == 'X') | (col('t_trc_st') == 'C')) #cancellations
post_Y = step_8_2.filter((col('t_trc_st') == 'Y')) #reverals

post_TR_starting_obs = post_TR.count()
post_XC_starting_obs = post_XC.count()
post_Y_starting_obs = post_Y.count()

print('count post_TR: {:,}'.format(post_TR_starting_obs))
print('count post_XC: {:,}'.format(post_XC_starting_obs))
print('count post_Y: {:,}'.format(post_Y_starting_obs))

#### Cancellations and Corrections

In [0]:
#Matching Cancellations and corrections and removing them
clean_post1 = \
    post_TR.alias('a') \
    .join(
        post_XC.alias('b'),
        ((col('a.t_cusip_id') == col('b.t_cusip_id'))
        &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
        &(col('a.t_trd_exctn_tm') == col('b.t_trd_exctn_tm'))
        &(col('a.t_rptd_pr') == col('b.t_rptd_pr'))
        &(col('a.t_entrd_vol_qt') == col('b.t_entrd_vol_qt'))
        &(col('a.t_rpt_side_cd') ==col('b.t_rpt_side_cd'))
        &(col('a.t_cntra_mp_id') == col('b.t_cntra_mp_id'))
        &(col('a.t_msg_seq_nb') == col('b.t_msg_seq_nb'))),
        how='left'
    ) \
    .select("a.*", col('b.t_trc_st').alias('trc_st_xc'))



print('matched records: {:,}'.format(clean_post1.filter(col('trc_st_xc').isNotNull()).count()))
print('other records: {:,}'.format(clean_post1.filter(col('trc_st_xc').isNull()).count()))
print('total records: {:,}'.format(clean_post1.count()))

In [0]:
#overview/check after join
clean_post1.groupBy('trc_st_xc').agg(count(col('trc_st_xc'))).show()

In [0]:
#Removing the matched records
clean_post1 = clean_post1.filter(col('trc_st_xc').isNull())

clean_post1_count = clean_post1.count()

In [0]:
print('** Summary: Cleaning Step 1.1 **')
print('Observations before cleaning: {:,}'.format(post_TR_starting_obs))
print('Observations after removing cancellations and corrections: {:,} '.format(clean_post1_count))
print('Percentage cleaned: {:,}%'.format(round(((clean_post1_count-post_TR_starting_obs)/post_TR_starting_obs*100*-1),2)))

#### Reversals

In [0]:
#Matching reversals and removing them
clean_post2 = \
    clean_post1.alias('a') \
    .join(
        post_Y.alias('b'),
        ((col('a.t_cusip_id') == col('b.t_cusip_id'))
        &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
        &(col('a.t_trd_exctn_tm') == col('b.t_trd_exctn_tm'))
        &(col('a.t_rptd_pr') == col('b.t_rptd_pr'))
        &(col('a.t_entrd_vol_qt') == col('b.t_entrd_vol_qt'))
        &(col('a.t_rpt_side_cd') ==col('b.t_rpt_side_cd'))
        &(col('a.t_cntra_mp_id') == col('b.t_cntra_mp_id'))
        &(col('a.t_msg_seq_nb') == col('b.t_orig_msg_seq_nb'))),
        how='left'
    ) \
    .select("a.*", col('b.t_trc_st').alias('trc_st_y'))

print('matched records: {:,}'.format(clean_post2.filter(col('trc_st_y').isNotNull()).count()))
print('other records: {:,}'.format(clean_post2.filter(col('trc_st_y').isNull()).count()))
print('total records: {:,}'.format(clean_post2.count()))

In [0]:
#overview/check after join
clean_post2.groupBy('trc_st_y').agg(count(col('trc_st_y'))).show()

In [0]:
#Removing matched reversals
clean_post2 = clean_post2.filter(col('trc_st_y').isNull())

clean_post2_count = clean_post2.count()

In [0]:
print('** Summary: Cleaning Step 1.2 **')
print('Observations before cleaning: {:,}'.format(clean_post1_count))
print('Observations after removing cancellations and corrections: {:,} '.format(clean_post2_count))
print('Percentage cleaned: {:,}%'.format(round(((clean_post2_count-clean_post1_count)/clean_post1_count*100*-1),2)))

In [0]:
print('** Summary: Cleaning Step 1 **')
print('Observations before cleaning: {:,}'.format(post_TR_starting_obs))
print('Observations after removing cancellations, corrections, and reversals: {:,} '.format(clean_post2_count))
print('Percentage cleaned: {:,}%'.format(round(((clean_post2_count-post_TR_starting_obs)/post_TR_starting_obs*100*-1),2)))

### Pre 2012/02/06 Data

In [0]:
#filtering the pre 2012 data and removing observations with empty cusip identifiers

step_8_3 = step_8_1.filter(
                          (col('t_cusip_id').isNotNull() == True)
                          & (col('r_t_trd_rpt_dt') < lit('2012-02-06'))
                          )

pre_starting_obs = step_8_3.count()
print('starting observations: {:,}'.format(pre_starting_obs))

In [0]:
#overview over the trade stati
step_8_3.groupBy('t_trc_st').agg(count(col('t_trc_st'))).show()
#T = Trade Report
#C = Trade Cancellation
#W = Trade Correction
#X = Trade Cancellation
#Y = Trade Reversals
#R = Trade Reversals

In [0]:
#overview over the as of field values
step_8_3.groupBy('t_asof_cd').agg(count('t_asof_cd')).show()
#R = Reversals
#A = As of (reported late)
#D = Delayed dissemination
#X = Delayed reversal

In [0]:
pre_C = step_8_3.filter((col('t_trc_st') == 'C')) #Trade Cancellation
pre_W = step_8_3.filter((col('t_trc_st') == 'W')) #Correction
pre_T = step_8_3.filter((col('t_trc_st') == 'T')) #Trade Report

pre_C_starting_obs = pre_C.count()
pre_W_starting_obs = pre_W.count()
pre_T_starting_obs = pre_T.count()


print('count pre_C: {:,}'.format(pre_C_starting_obs))
print('count pre_W: {:,}'.format(pre_W_starting_obs))
print('count pre_T: {:,}'.format(pre_T_starting_obs))

#### Cancellations

In [0]:
#Matching and removing cancellations
clean_pre1 = \
    pre_T.alias('a') \
    .join(
        pre_C.alias('b'),
        ((col('a.t_cusip_id') == col('b.t_cusip_id'))
        &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
        &(col('a.t_trd_exctn_tm') == col('b.t_trd_exctn_tm'))
        &(col('a.t_rptd_pr') == col('b.t_rptd_pr'))
        &(col('a.t_entrd_vol_qt') == col('b.t_entrd_vol_qt'))
        &(col('a.t_trd_rpt_dt') ==col('b.t_trd_rpt_dt'))
        &(col('a.t_msg_seq_nb') == col('b.t_orig_msg_seq_nb'))),
        how='left'
    ) \
    .select("a.*", col('b.t_trc_st').alias('trc_st_c'))

In [0]:
#overview/check after join
clean_pre1.groupBy('trc_st_c').agg(count('trc_st_c')).show()

In [0]:
#Removing matched records
clean_pre1 = clean_pre1.filter(col('trc_st_c').isNull())

clean_pre1_count = clean_pre1.count()

In [0]:
print('** Summary: Cleaning Step 2.1 **')
print('Observations before cleaning: {:,}'.format(pre_T_starting_obs))
print('Observations after removing cancellations and corrections: {:,} '.format(clean_pre1_count))
print('Percentage cleaned: {:,}%'.format(round(((clean_pre1_count-pre_T_starting_obs)/pre_T_starting_obs*100*-1),2)))

NOTE ON CANCELLATION (C) CASES: 

Fewer obs canceled using this left join than the ones labeled as C in the original dataset. 
- No of TRC_ST = C in data = 1,383,219
- No of cases in _DEL_C = 1,356,948


Reason: 
some TRC_ST=C cases show an ORIG_MSG_SEQ_NB that doesn't exist in the orignal dataset.

Example: 
Among TRC_ST=C samples, BOND_SYM_ID=AA.HM and TRD_EXCTN_DT=20011003 TRD_EXCTN_TM=9:25:54 and RPTD_PR=103.144999 has ORIG_MSG_SEQ_NB=0033378.
In the orignal TRC_ST=T samples, no obs has this MSG_SEQ_NB. The one that appears to be the matching cancel record has MSG_SEQ_NB= 0001482.
Therefore, this 0001482 record is not deleted from the sample;

#### Corrections

In [0]:
#Remove corrections

#NOTE: on a given day, a bond can have more than one round of correction
# One W to correct an older W, which then corrects the original T
# Before joining back to the T data, first need to clean out the W to handle the situation described above
# The following section handles the chain of W cases

# 2.2.1 Sort out all msg_seq_nb and orig_msg_seq_nb
w_msg = pre_W.select('t_cusip_id', 't_bond_sym_id', 't_trd_exctn_dt', 't_trd_exctn_tm', 't_msg_seq_nb')
w_msg = w_msg.withColumn('r_flag',when(col('t_msg_seq_nb').isNotNull() == True, 'msg'))

w_omsg = pre_W.select('t_cusip_id', 't_bond_sym_id', 't_trd_exctn_dt', 't_trd_exctn_tm', 't_orig_msg_seq_nb')
w_omsg = w_omsg.withColumn('r_flag',when(col('t_orig_msg_seq_nb').isNotNull() == True, 'omsg'))
w_omsg = w_omsg.withColumnRenamed('t_orig_msg_seq_nb','t_msg_seq_nb')

In [0]:
#create an union of all msg seq numbers to compare them to each transaction
w_ = w_omsg.unionByName(w_msg)

In [0]:
# 2.2.2 Count the number of appearance (napp) of a msg_seq_nb: 
# If appears more than once then it is part of later correction;
w_napp = w_.groupBy('t_cusip_id', 't_bond_sym_id', 't_trd_exctn_dt', 't_trd_exctn_tm','t_msg_seq_nb').agg(count('t_msg_seq_nb').alias('r_napp'))

#overview of number of appearances
w_napp.groupBy('r_napp').agg(count('r_napp')).show()

In [0]:
# 2.2.3 Check whether one msg_seq_nb is associated with both msg and orig_msg or only to orig_msg;
#If msg_seq_nb appearing more than once is associated with only orig_msg - 
#It means that more than one msg_seq_nb is linked to the same orig_msg_seq_nb for correction. 
w_mult = w_.select('t_cusip_id', 't_bond_sym_id', 't_trd_exctn_dt','t_trd_exctn_tm', 't_msg_seq_nb','r_flag').distinct()

w_mult1 = w_mult.groupBy('t_cusip_id', 't_bond_sym_id', 't_trd_exctn_dt', 't_trd_exctn_tm', 't_msg_seq_nb').agg(count('t_msg_seq_nb').alias('r_ntype'))

#As above we see that most msq_seq_numb only appear once
w_mult1.groupBy('r_ntype').agg(count('r_ntype')).show()

In [0]:
proc sql;
 create table __w_mult as select distinct cusip_id, bond_sym_id, trd_exctn_dt, trd_exctn_tm, msg_seq_nb, flag from __w; quit;
proc sql;
 create table __w_mult1 as select distinct cusip_id, bond_sym_id, trd_exctn_dt, trd_exctn_tm, msg_seq_nb, count(*) as ntype 
 from __w_mult group by cusip_id, trd_exctn_dt, trd_exctn_tm, msg_seq_nb;
quit;

In [0]:
# 2.2.4 Combine the npair and ntype info;
w_comb =\
    w_napp.alias('a') \
    .join(
        w_mult1.alias('b'),
        ((col('a.t_cusip_id') == col('b.t_cusip_id'))
        &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
        &(col('a.t_trd_exctn_tm') == col('b.t_trd_exctn_tm'))
        &(col('a.t_msg_seq_nb') == col('b.t_msg_seq_nb'))),
        how='left'
    ) \
    .select("a.*", col('b.r_ntype').alias('r_ntype')).distinct()


#overview
w_comb.groupby('r_napp','r_ntype').agg(count('r_napp'),count('r_ntype')).show()

In [0]:
# Map back by matching CUSIP Excution Date and Time to remove msg_seq_nb that appears more than once;
# If napp=1 or (napp>1 but ntype=1);
w_comb_2 = w_comb.filter((col('r_napp') == 1 ) | ((col('r_napp') > 1) & (col('r_ntype') == 1)))

w_keep = \
    w_comb_2.alias('a') \
    .join(
        w_.alias('b'),
        ((col('a.t_cusip_id') == col('b.t_cusip_id'))
        &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
        &(col('a.t_trd_exctn_tm') == col('b.t_trd_exctn_tm'))
        &(col('a.t_msg_seq_nb') == col('b.t_msg_seq_nb'))),
        how='left'
    ) \
    .select("a.*", col('b.r_flag').alias('r_flag')).distinct()

In [0]:
#2.2.5 Caluclate no of pair of records;
w_keep_gb = w_keep.groupBy('t_cusip_id', 't_trd_exctn_dt', 't_trd_exctn_tm').agg((count('*')/2).alias('r_npair')) 

In [0]:
w_keep_gb_flag = \
    w_keep_gb.alias('a') \
    .join(
        w_keep.alias('b'),
        ((col('a.t_cusip_id') == col('b.t_cusip_id'))
        &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
        &(col('a.t_trd_exctn_tm') == col('b.t_trd_exctn_tm'))),
        how='left'
    ) \
    .select("a.*", col('b.r_flag'),col('b.t_msg_seq_nb')).distinct()

In [0]:
#* For records with only one pair of entry at a given time stamp - transpose using the flag information;
w_keep1 = w_keep_gb_flag.filter(col('r_npair') == 1)


w_keep1_pd = pd.pivot_table(w_keep1.toPandas(),values='t_msg_seq_nb',index=['t_cusip_id','t_trd_exctn_dt','t_trd_exctn_tm'],columns='r_flag')

w_keep1_pd['msg'] = w_keep1_pd['msg'].astype(int)
w_keep1_pd['omsg'] = w_keep1_pd['omsg'].astype(int)

w_keep1_pd = w_keep1_pd.rename(columns={"msg": "t_msg_seq_nb", 'omsg': "t_orig_msg_seq_nb"})

w_keep1_pd.head()

w_keep1 = spark.createDataFrame(w_keep1_pd.reset_index())

In [0]:
#For records with more than one pair of entry at a given time stamp - join back the original msg_seq_nb;
w_keep_this = w_keep_gb_flag.filter((col('r_flag') == 'msg' ) & (col('r_npair') > 1))

w_keep2 = \
    w_keep_this.alias('a') \
    .join(
        pre_W.alias('b'),
        ((col('a.t_cusip_id') == col('b.t_cusip_id'))
        &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
        &(col('a.t_trd_exctn_tm') == col('b.t_trd_exctn_tm'))
        &(col('a.t_msg_seq_nb') == col('b.t_msg_seq_nb'))),
        how='left'
    ) \
    .select("a.*", col('b.t_orig_msg_seq_nb')).distinct().drop('a.r_npair')


w_keep2 = w_keep2.drop('r_npair','r_flag')

In [0]:
w__clean = w_keep1.unionByName(w_keep2)

In [0]:
# 2.2.6 Join back to get all the other information;
pre_w1 = pre_W.drop('t_orig_msg_seq_nb')

w_clean =\
    w__clean.alias('a') \
    .join(
        pre_w1.alias('b'),
        ((col('a.t_cusip_id') == col('b.t_cusip_id'))
        &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
        &(col('a.t_trd_exctn_tm') == col('b.t_trd_exctn_tm'))
        &(col('a.t_msg_seq_nb') == col('b.t_msg_seq_nb'))),
        how='left'
    ) \
    .select('a.t_orig_msg_seq_nb','b.*').distinct()

In [0]:
w_clean.count(), w__clean.count()

In [0]:
#change name of columns for union
#old_columns = [x for x in rep_w.columns]
#new_column_name_list= list(map(lambda x: x.replace("w_", ""), rep_w.columns))

#merged_list = [(old_columns[i], new_column_name_list[i]) for i in range(0, len(old_columns))]


#for x, y in merged_list:
#  rep_w = rep_w.withColumnRenamed(x, y)


In [0]:
clean_pre1 = clean_pre1.repartition(1000, "t_cusip_id").sortWithinPartitions("t_cusip_id", "t_trd_exctn_dt").cache()
w_clean = w_clean.repartition(1000, "t_cusip_id").sortWithinPartitions("t_cusip_id", "t_trd_exctn_dt").cache()

In [0]:
# 2.2.7 Match up with Trade Record data to delete the matched T record */;
# Matching by Cusip_ID, Date, and MSG_SEQ_NB;
# W records show ORIG_MSG_SEQ_NB matching orignal record MSG_SEQ_NB;

#for x in w_clean.columns:
#  w_clean = w_clean.withColumnRenamed(x,'w_'+str(x))

clean_pre2 =\
    clean_pre1.alias('a') \
    .join(
        w_clean.alias('b'),
        ((col('a.t_cusip_id') == col('b.t_cusip_id'))
        &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
        &(col('a.t_msg_seq_nb') == col('b.t_orig_msg_seq_nb'))),
        how='left'
    ) \
    .select('a.*',col('b.t_trc_st').alias('r_t_trc_st_w'),
                  col('b.t_msg_seq_nb').alias('r_t_mod_msg_seq_nb'),
                  col('b.t_orig_msg_seq_nb').alias('r_t_mod_orig_msg_seq_nb')).distinct()

clean_pre2 = clean_pre2.cache()

In [0]:
clean_pre2_count = clean_pre2.count()
print('count: {:,}'.format(clean_pre2_count))

In [0]:
clean_pre2.groupBy('r_t_trc_st_w').agg(count('r_t_trc_st_w')).show()

In [0]:
del_w = clean_pre2.filter(col('r_t_trc_st_w') == 'W')

In [0]:
#Delete matched T records
clean_pre2 = clean_pre2.filter(col('r_t_trc_st_w').isNull())
clean_pre2 = clean_pre2.drop('r_t_mod_msg_seq_nb','r_t_mod_orig_msg_seq_nb')

clean_pre2_after_W_count = clean_pre2.count()

print('clean_pre2 after removing W: {:,}'.format(clean_pre2_after_W_count))
print('------------------------------------------')
print('clean_pre2 initial: {:,}'.format(clean_pre2_count))
print('diff: {:,}'.format(clean_pre2_count - clean_pre2_after_W_count))

In [0]:
#* Replace T records with corresponding W records;
#* Filter out W records with valid matching T from the previous step;
rep_w = w_clean.alias('a')\
                          .join(
                                del_w.alias('b'),
                                ((col('a.t_cusip_id') == col('b.t_cusip_id'))
                                & (col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
                                & (col('a.t_msg_seq_nb') == col('b.r_t_mod_msg_seq_nb'))),
                                how='left')\
                                .select('a.*','b.r_t_trc_st_w','b.r_t_MOD_msg_seq_nb','b.r_t_MOD_orig_msg_seq_nb').distinct()

In [0]:
rep_w_count = rep_w.count()
rep_w = rep_w.filter(col('r_t_trc_st_w')=='W')
rep_w_count_after = rep_w.count()

rep_w = rep_w.drop('r_t_trc_st_w','r_t_MOD_msg_seq_nb','r_t_MOD_orig_msg_seq_nb')

print('before: {:,}'.format(rep_w_count))
print('after: {:,}'.format(rep_w_count_after))
print('diff : {:,}'.format(rep_w_count - rep_w_count_after))

In [0]:
rep_w = rep_w.dropDuplicates(['t_cusip_id','t_trd_exctn_dt','t_msg_seq_nb','t_orig_msg_seq_nb','t_rptd_pr','t_entrd_vol_qt'])
rep_w = rep_w.orderBy(col('t_cusip_id'),col('t_trd_exctn_dt'),col('t_msg_seq_nb'),col('t_orig_msg_seq_nb'),col('t_rptd_pr'),col('t_entrd_vol_qt'))

In [0]:
#change name of columns for union

#old_columns = [x for x in rep_w.columns]
#new_column_name_list= list(map(lambda x: x.replace("w_", ""), rep_w.columns))

#merged_list = [(old_columns[i], new_column_name_list[i]) for i in range(0, len(old_columns))]


#for x, y in merged_list:
#  rep_w = rep_w.withColumnRenamed(x, y)

#drop other colums in clean_pre2 which I do not need anymore  
clean_pre2 = clean_pre2.drop('trc_st_c','r_t_trc_st_w','r_t_mod_msg_seq_nb','r_t_mod_orig_msg_seq_nb')

In [0]:
len(clean_pre2.columns), len(rep_w.columns)

In [0]:
# Combine the cleaned T records and correct replacement W records;
clean_pre3 =  clean_pre2.unionByName(rep_w)

clean_pre3_count = clean_pre3.count()

In [0]:
clean_pre3.groupBy('t_asof_cd').agg(count('t_asof_cd')).show()

In [0]:
clean_pre1_count = clean_pre1.count()

In [0]:
print('** Summary: Cleaning Step 2.2 **')
print('Observations before cleaning: {:,}'.format(clean_pre1_count))
print('Observations after removing cancellations and corrections: {:,} '.format(clean_pre3_count))
print('Percentage cleaned: {:,}%'.format(round(((clean_pre3_count-clean_pre1_count)/clean_pre1_count*100*-1),2)))

#58,011,189

In [0]:
clean_pre3_count = clean_pre3.count()

#### Reversals

In [0]:
#Filter out the reversed transaction to sort them out in the following section
rev_header = clean_pre3.filter(col('t_asof_cd')== 'R') \
                          .select('t_cusip_id',
                                  't_bond_sym_id',
                                  't_trd_exctn_dt',
                                  't_trd_exctn_tm',
                                  't_entrd_vol_qt',
                                  't_rptd_pr',
                                  't_rpt_side_cd',
                                  't_cntra_mp_id',
                                  't_trd_rpt_dt',
                                  't_trd_rpt_tm')

In [0]:
#Create sequene number for each transaction which has been reveresed
windowSpec = \
  Window \
    .partitionBy('t_cusip_id',
                 't_bond_sym_id',
                 't_trd_exctn_dt',
                 't_entrd_vol_qt',
                 't_rptd_pr',
                 't_rpt_side_cd',
                 't_cntra_mp_id'
                  ) \
    .orderBy('t_cusip_id',
             't_bond_sym_id',
             't_trd_exctn_dt',
             't_entrd_vol_qt',
             't_rptd_pr',
             't_rpt_side_cd',
             't_cntra_mp_id'
            )

  
rev_header_6 = \
   rev_header \
       .withColumn('seq',row_number().over(windowSpec))

In [0]:
rev_header_6.groupby('seq').agg(count(col('seq'))).sort(col('count(seq)').desc()).show(n=5)

In [0]:
clean_pre3.groupBy('t_asof_cd').agg(count('t_asof_cd')).sort(col('count(t_asof_cd)').desc()).toPandas().head()

Unnamed: 0,t_asof_cd,count(t_asof_cd)
0,A,984818
1,R,541973
2,D,763
3,X,9
4,,0


In [0]:
clean_pre3_count = clean_pre3.count()

In [0]:
# Create the same ordering among the non-reversal records;
# Remove records that are R (reversal) D (Delayed dissemination) and X (delayed reversal);
clean_pre4 = clean_pre3.filter((col('t_asof_cd').isNull()) |
                               (col('t_asof_cd') == 'A')
                              )

clean_pre4_count = clean_pre4.count()

clean_pre4_header = clean_pre4.select('t_cusip_id',
                                    't_bond_sym_id',
                                    't_trd_exctn_dt',
                                    't_trd_exctn_tm',
                                    't_entrd_vol_qt',
                                    't_rptd_pr',
                                    't_rpt_side_cd',
                                    't_cntra_mp_id',
                                    't_trd_rpt_dt',
                                    't_trd_rpt_tm',
                                    't_msg_seq_nb')

In [0]:
print('Observations clean_pre3_count: {:,}'.format(clean_pre3_count))
print('Observations clean_pre4_count: {:,}'.format(clean_pre4_count))

In [0]:
# Match by 6 keys (excluding execution time);
windowSpec = \
  Window \
    .partitionBy('t_cusip_id',
                 't_bond_sym_id',
                 't_trd_exctn_dt',
                 't_entrd_vol_qt',
                 't_rptd_pr',
                 't_rpt_side_cd',
                 't_cntra_mp_id') \
    .orderBy('t_cusip_id',
             't_bond_sym_id',
             't_trd_exctn_dt',
             't_entrd_vol_qt',
             't_rptd_pr',
             't_rpt_side_cd',
             't_cntra_mp_id'
            )

  
clean_pre4_header = \
   clean_pre4_header \
       .withColumn('seq6',row_number().over(windowSpec))

In [0]:
#Join Reversal with Non-Reversal to delete the corresponding ones

clean_pre5_header = (
    clean_pre4_header.alias('a')
      .join(
        rev_header_6.alias('b'),
          ((col('a.t_cusip_id') == col('b.t_cusip_id'))
          &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
          &(col('a.t_entrd_vol_qt') == col('b.t_entrd_vol_qt'))
          &(col('a.t_rptd_pr') == col('b.t_rptd_pr'))
          &(col('a.t_rpt_side_cd') == col('b.t_rpt_side_cd'))
          &(col('a.t_cntra_mp_id') == col('b.t_cntra_mp_id'))
          &(col('a.seq6') == col('b.seq'))),
          how='left'
    ) \
    .select("a.*", col('b.seq').alias('rev_seq6')).distinct()
)

In [0]:
#Removed matched records and drop columns which I do not need anymore
clean_pre5_header = clean_pre5_header.filter(col('rev_seq6').isNull()).drop('rev_seq6','seq')

In [0]:
#Join it back on 
clean_pre5 = (
    clean_pre4.alias('a') \
    .join(
          clean_pre5_header.alias('b'),
          (col('a.t_cusip_id') == col('b.t_cusip_id'))
          &(col('a.t_trd_exctn_dt') == col('b.t_trd_exctn_dt'))
          &(col('a.t_trd_exctn_tm') == col('b.t_trd_exctn_tm'))
          &(col('a.t_entrd_vol_qt') == col('b.t_entrd_vol_qt'))
          &(col('a.t_rptd_pr') == col('b.t_rptd_pr'))
          &(col('a.t_rpt_side_cd') == col('b.t_rpt_side_cd'))
          &(col('a.t_cntra_mp_id') == col('b.t_cntra_mp_id'))
          &(col('a.t_msg_seq_nb') == col('b.t_msg_seq_nb'))
          &(col('a.t_trd_rpt_dt') ==col('b.t_trd_rpt_dt'))
          &(col('a.t_trd_rpt_tm') == col('b.t_trd_rpt_tm')),
          how='left'
          ) 
    .select('a.*').distinct()
)

clean_pre5_count = clean_pre5.count()

In [0]:
'''proc sql;
 create table _clean_pre5 as select distinct a.*
 from _clean_pre4 as a , _clean_pre5_header as b
 where a.cusip_id=b.cusip_id
 and a.trd_exctn_dt = b.trd_exctn_dt
 and a.trd_exctn_tm = b.trd_exctn_tm
 and a.entrd_vol_qt = b.entrd_vol_qt
 and a.rptd_pr 		= b.rptd_pr
 and a.rpt_side_cd  = b.rpt_side_cd
 and a.cntra_mp_id  = b.cntra_mp_id
 and a.msg_seq_nb 	= b.msg_seq_nb
 and a.trd_rpt_dt   = b.trd_rpt_dt
 and a.trd_rpt_tm 	= b.trd_rpt_tm;
quit; ***   92,965,767;
'''

In [0]:
print('** Summary: Cleaning Step 2.3 **')
print('Observations before cleaning: {:,}'.format(clean_pre3_count))
print('Observations after removing cancellations and corrections: {:,} '.format(clean_pre5_count))
print('Percentage cleaned: {:,}%'.format(round(((clean_pre5_count-clean_pre3_count)/clean_pre3_count*100*-1),2)))

In [0]:
print('** Summary: Cleaning Step 2 **')
print('Observations before cleaning: {:,}'.format(pre_T_starting_obs))
print('Observations after removing cancellations and corrections: {:,} '.format(clean_pre5_count))
print('Percentage cleaned: {:,}%'.format(round(((clean_pre5_count-pre_T_starting_obs)/pre_T_starting_obs*100*-1),2)))

## Combining Post and Pre Data

In [0]:
clean_post2 = clean_post2.drop('trc_st_xc','trc_st_y')

In [0]:
# Combine the pre and post data togetehr
cleaned_data = clean_post2.unionByName(clean_pre5)
cleaned_data_count = cleaned_data.count()

In [0]:
tables = "maximilian"
table_name = "trace_dn_all_cleaned"

cleaned_data.write.mode('overwrite').format('delta').saveAsTable(f'{tables}.{table_name}')

## Total stats

In [0]:
print('** Summary: Cleaning **')
print('Observations before cleaning: {:,}'.format(before_clean_count))
print('Observations after all: {:,} '.format(cleaned_data_count))
print('Percentage cleaned: {:,}'.format(round(((cleaned_data_count-before_clean_count)/before_clean_count*100*-1),2)))