In [1]:
import pandas as pd
from pandas.tseries.offsets import MonthEnd, MonthBegin, BDay, CustomBusinessDay
import swifter
import ray
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType, FloatType, IntegerType
from pyspark.sql.functions import udf
from scipy.optimize import fsolve, minimize_scalar
import os
import shutil
import json

# Bond Research Project: Exploring Signals for Duration Adjusted Returns

# TRACE and Mergent Filtering and Merging (Initial Sample Filtering)

## Functions

In [2]:
def get_wrds_dtype_dict(dirty_str):
    '''
    Parameters:
    dirty_str (str): wrds feature descriptions table text when copied and pasted
    Returns:
    dict: dictionary where keys are feature names and values are dtypes
    '''
    dtype_dict = {}
    str_lines = dirty_str.split("\n")
    for line in str_lines:
        line_contents = line.split("\t")
        dtype_dict[line_contents[0]] = line_contents[1]
    return dtype_dict


def wrds_to_spark_dtype_dict(wrds_dtype_dict,force_custom=False):
    '''
    Parameters:
    wrds_dtype_dict (dict): dictionary of wrds table features (keys) and dtypes as they appear in wrds (values)
    force_custom (bool): when True, any wrds table feature that satisfies a certain condition is forced to be cast as different spark dtype than suggested by wrds
    Returns:
    dict: dictionary of wrds table features (keys) and dtype objects from spark.sql.types
    '''
    spark_dtype_dict = {}
    wrds_to_spark_dtype_map = {
        "Char": StringType(),
        "Date": DateType(),
        "Float": FloatType(),
        "Time": TimestampType()
    }
    for feature, wrds_dtype in wrds_dtype_dict.items():
        spark_dtype_dict[feature] = wrds_to_spark_dtype_map[wrds_dtype]
        if force_custom: 
            if "_tm" in feature:
                spark_dtype_dict[feature] = TimestampType()
            if feature in ["days_to_sttl_ct", "naics_code"]:
                spark_dtype_dict[feature] = IntegerType()
            
    return spark_dtype_dict


def get_spark_schema(raw_csv_cols,dirty_wrds_str,force_custom=False):
    '''
    Parameters:
    dirty_wrds_str (str): wrds feature descriptions table text when copied and pasted
    force_custom (bool): when True, any wrds table feature that satisfies a certain condition is forced to be cast as different spark dtype than suggested by wrds
    Returns:
    StructType: structype schema for wrds csv
    '''
    wrds_dtype_dict = get_wrds_dtype_dict(dirty_wrds_str)
    spark_dtype_dict = wrds_to_spark_dtype_dict(wrds_dtype_dict,force_custom)
    schema = StructType([StructField(feature,spark_dtype_dict[feature],True) for feature in raw_csv_cols])
    return schema, spark_dtype_dict, wrds_dtype_dict

In [3]:
def spark_dtype_to_str_map(dict_of_dtypes, setting=0):
    '''
    Parameters:
    dict_of_dtypes (dict): dictionary where values are feature column names from TRACE or MERGENT and values are spark dtype objects or str representations of them
    setting (int): 0 = object to string, 1 = string to object
    Returns:
    dict: dictionary where values are feature column names from TRACE or MERGENT and values are the swapped spark dtype objects or str representations of them
    '''
    output_dict = {}
    str_to_obj_map = {
        "StringType": StringType(),
        "DateType": DateType(),
        "FloatType": FloatType(),
        "TimestampType": TimestampType(),
        "IntegerType": IntegerType()
    }
    for key,value in dict_of_dtypes.items():
        if setting==1:
            output_dict[key] = str_to_obj_map[value]
        if setting==0:
            if value == StringType():
                output_dict[key] = "StringType"
            if value == DateType():
                output_dict[key] = "DateType"
            if value == FloatType():
                output_dict[key] = "FloatType"
            if value == TimestampType():
                output_dict[key] = "TimestampType"
            if value == IntegerType():
                output_dict[key] = "IntegerType"
                
    return output_dict

In [4]:
def get_spark_schema_2(list_of_cols,return_cols_by_origin=False):
    '''
    Parameters:
    list_of_cols (list): list of col names
    return_cols_by_origin (bool): if True, returns dictionary of cols according to whether they are from the TRACE or mergent dataset
    Returns:
    StructType: spark schema for dataframe
    '''
    with open("trace_spark_dtype_dict.json","r") as file:
        trace_spark_dtype_str_dict = json.load(file)
    trace_spark_dtype_dict = spark_dtype_to_str_map(trace_spark_dtype_str_dict,1)
    with open("mergent_spark_dtype_dict.json","r") as file:
        mergent_spark_dtype_str_dict = json.load(file)
    mergent_spark_dtype_dict = spark_dtype_to_str_map(mergent_spark_dtype_str_dict,1)
    df_spark_dtype_dict = {}
    for col in list_of_cols:
        if col in trace_spark_dtype_dict.keys():
            df_spark_dtype_dict[col] = trace_spark_dtype_dict[col]
        else:
            df_spark_dtype_dict[col] = mergent_spark_dtype_dict[col]
    df_schema = StructType([StructField(col,df_spark_dtype_dict[col],True) for col in df_spark_dtype_dict.keys()])
    if return_cols_by_origin:
        cols_by_origin_dict = {}
        trace_cols = list(set(list_of_cols) & set(list(trace_spark_dtype_dict.keys())))
        mergent_cols = list(set(list_of_cols) & set(list(mergent_spark_dtype_dict.keys())))
        cols_by_origin_dict["trace"] = trace_cols
        cols_by_origin_dict["mergent"] = mergent_cols
        return df_schema, cols_by_origin_dict
    return df_schema

## Start Spark Session

In [5]:
import pyspark
print(pyspark.__version__)

3.5.5


In [6]:
os.environ["HADOOP_HOME"] = r"C:\Users\wvill\hadoop"
os.environ["hadoop.home.dir"] = r"C:\Users\wvill\hadoop"
os.environ["HADOOP_OPTIONAL_TOOLS"] = "false"
os.environ["SPARK_DRIVER_MEMORY"] = "6g"
os.environ["SPARK_EXECUTOR_MEMORY"] = "6g"
os.environ["PYSPARK_PYTHON"] = r"C:\Users\wvill\AppData\Local\Programs\Python\Python311\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\wvill\AppData\Local\Programs\Python\Python311\python.exe"

In [7]:
spark = SparkSession.builder.appName("WRDS_CSV_Processor").config("spark.hadoop.io.native.lib.available", "false").getOrCreate()

In [8]:
print(os.getenv("HADOOP_HOME"))

C:\Users\wvill\hadoop


In [9]:
spark.conf.get("spark.hadoop.io.native.lib.available")

'false'

## WRDS Variable Description Strings

In [10]:
trace_enhanced_dtype_str_dirty="""cusip_id	Char	CUSIP ID (cusip_id)
bond_sym_id	Char	TRACE Bond Symbol (bond_sym_id)
company_symbol	Char	Company Symbol (issuer stock ticker) (company_symbol)
trd_exctn_dt	Date	Execution Date (trd_exctn_dt)
trd_exctn_tm	Float	Execution Time (trd_exctn_tm)
trd_rpt_dt	Date	Report Date (trd_rpt_dt)
trd_rpt_tm	Float	Report Time (trd_rpt_tm)
msg_seq_nb	Char	Message Sequence Number (msg_seq_nb)
trc_st	Char	Trade Status (trc_st)
scrty_type_cd	Char	Security Type (scrty_type_cd)
wis_fl	Char	When Issued Indicator (wis_fl)
cmsn_trd	Char	Commission Indicator (cmsn_trd)
entrd_vol_qt	Float	Quantity (entrd_vol_qt)
rptd_pr	Float	Price (rptd_pr)
yld_sign_cd	Char	Yield Direction (yld_sign_cd)
yld_pt	Float	Yield (yld_pt)
asof_cd	Char	As Of Indicator (asof_cd)
days_to_sttl_ct	Char	Seller Sales Day (days_to_sttl_ct)
sale_cndtn_cd	Char	Sale Condition (sale_cndtn_cd)
sale_cndtn2_cd	Char	Second Modifier (sale_cndtn2_cd)
rpt_side_cd	Char	Buy/Sell Indicator (rpt_side_cd)
buy_cmsn_rt	Float	Buyer Commission (buy_cmsn_rt)
buy_cpcty_cd	Char	Buyer Capacity (buy_cpcty_cd)
sell_cmsn_rt	Float	Seller Commission (sell_cmsn_rt)
sell_cpcty_cd	Char	Seller Capacity (sell_cpcty_cd)
cntra_mp_id	Char	Contra Party Indicator (cntra_mp_id)
agu_qsr_id	Char	AGU Indicator (agu_qsr_id)
spcl_trd_fl	Char	Special Price Indicator (spcl_trd_fl)
trdg_mkt_cd	Char	Trading Market Indicator (trdg_mkt_cd)
dissem_fl	Char	Dissemination Flag (dissem_fl)
orig_msg_seq_nb	Char	Original Message Sequence Number (orig_msg_seq_nb)
bloomberg_identifier	Char	Bloomberg Identifier (bloomberg_identifier)
sub_prdct	Char	Sub-Product (sub_prdct)
stlmnt_dt	Date	Settlement Date (stlmnt_dt)
trd_mod_3	Char	Trade Modifier 3 (trd_mod_3)
trd_mod_4	Char	Trade Modifier 4 (trd_mod_4)
rptg_party_type	Char	Reporting Party Type (rptg_party_type)
lckd_in_ind	Char	Locked In Indicator (lckd_in_ind)
ats_indicator	Char	ATS Indicator (ats_indicator)
pr_trd_dt	Date	Prior Trade Report Date (pr_trd_dt)
first_trade_ctrl_date	Date	First Trade Control Date (first_trade_ctrl_date)
first_trade_ctrl_num	Char	First Trade Control Number (first_trade_ctrl_num)"""

In [11]:
mergent_bond_issue_dtype_str_dirty = """issue_id	Float	Issue ID (issue_id)
issuer_id	Float	Issuer ID (issuer_id)
issuer_id_affected	Float	Issuer ID (issuer_id_affected)
issuer_cusip	Char	Issuer CUSIP (issuer_cusip)
issue_cusip	Char	Issue CUSIP (issue_cusip)
complete_cusip	Char	Complete CUSIP (complete_cusip)
cusip_name	Char	CUSIP Name (cusip_name)
prospectus_issuer_name	Char	Prospectus Issuer Name (prospectus_issuer_name)
issue_name	Char	Issue Name (issue_name)
sedol	Char	SEDOL (sedol)
isin	Char	ISI Number (isin)
reallowance	Float	Reallowance (reallowance)
putable	Char	Putable (putable)
principal_amt	Float	Principal Amount (principal_amt)
press_release	Char	Press Release (press_release)
preferred_security	Char	Preferred Security (preferred_security)
perpetual	Char	Perpetual (perpetual)
overallotment_opt	Char	Overallotment Option (overallotment_opt)
oid	Char	Orig Issue Discount (oid)
offering_yield	Float	Offering Yield (offering_yield)
offering_price	Float	Offering Price (offering_price)
offering_date	Date	Offering Date (offering_date)
offering_amt	Float	Offering Amount (in thousands) (offering_amt)
mtn	Char	Medium Term Note (mtn)
maturity	Date	Maturity Date (maturity)
issue_offered_global	Char	Global Offer (issue_offered_global)
gross_spread	Float	Gross Spread (gross_spread)
fungible	Char	Fungible (fungible)
form_of_own	Char	Form of Ownership (form_of_own)
foreign_currency	Char	Foreign Currency (foreign_currency)
exchangeable	Char	Exchangeable (exchangeable)
enhancement	Char	Enhancement (enhancement)
dep_eligibility	Char	Eligible Depositories (dep_eligibility)
denomination	Char	Denomination (denomination)
delivery_date	Date	Delivery Date (delivery_date)
defeased_date	Date	Defeased Date (defeased_date)
defeased	Char	Defeased (defeased)
defeasance_type	Char	Defeasance Type (defeasance_type)
defaulted	Char	Defaulted (defaulted)
covenants	Char	Covenants (covenants)
coupon_type	Char	Coupon Type (coupon_type)
convertible	Char	Convertible (convertible)
comp_neg_exch_deal	Char	Type of Sale (comp_neg_exch_deal)
canadian	Char	Canadian Issuer (canadian)
bond_type	Char	Bond Type (bond_type)
asset_backed	Char	Asset Backed (asset_backed)
announced_call	Char	Announced Call (announced_call)
active_issue	Char	Active Issue (active_issue)
private_placement	Char	Private Placement (private_placement)
yankee	Char	Yankee (yankee)
unit_deal	Char	Unit Deal (unit_deal)
treasury_spread	Float	Treasury Spread (treasury_spread)
treasury_maturity	Char	Treasury Maturity (treasury_maturity)
tender_exch_offer	Char	Tender/Exchange Offer (tender_exch_offer)
subsequent_data	Char	Subsequent Data (subsequent_data)
slob	Char	SLOB (slob)
settlement_type	Char	Settlement Type (settlement_type)
selling_concession	Float	Selling Concession (selling_concession)
security_pledge	Char	Security Pledge (security_pledge)
security_level	Char	Security Level (security_level)
sec_reg_type2	Char	Additional SEC Registration Type (sec_reg_type2)
sec_reg_type1	Char	SEC Registration Type (sec_reg_type1)
rule_415_reg	Char	Rule 415 Registration (rule_415_reg)
rule_144a	Char	Rule 144a (rule_144a)
registration_rights	Char	Registration Rights (registration_rights)
refunding_date	Date	Refunding Date (refunding_date)
refund_protection	Char	Refund Protection (refund_protection)
redeemable	Char	Redeemable (redeemable)
determination_date	Date	The next date as of which the coupon for floating rate securities is calculated. (determination_date)
see_note	Char	Indicates that more detailed information on the coupon formula is included in the associated text footnote. (see_note)
reset_date_orig	Date	Represents the first date as of which the issue began to accrue interest at the first calculated coupon rate. (reset_date_orig)
reset_date	Date	The next date as of which the issue will begin to accrue interest at the new coupon rate. (reset_date)
lesser_of	Char	A flag indicating that the issues coupon resets to the lesser of the associated rate calculation formulas in the INDEX table. (lesser_of)
greater_of	Char	A flag indicating that the issues coupon resets to the greater of the associated rate calculation formulas in the INDEX table. (greater_of)
fix_frequency	Char	A four-letter code indicating how often the coupon will be reset. (fix_frequency)
determination_date_orig	Date	Represents the first date a new interest rate was calculated for this issue. (determination_date_orig)
pay_in_kind_exp_date	Date	Expiration date of the pay_in_kind option. (pay_in_kind_exp_date)
pay_in_kind	Char	Flag indicating that the interest on the issue may be paid in more of the same security, or in some cases, in other securities. (pay_in_kind)
last_interest_date	Date	Last interest payment date (last_interest_date)
interest_frequency	Char	Code indicating how often interest payments will be made. (interest_frequency)
first_interest_date	Date	Date on which first interest payment will be made to the bondholder. (first_interest_date)
day_count_basis	Char	Basis used for determining the interest paid during each interest period. (day_count_basis)
dated_date	Date	Date from which interest accrues or from which original issue discount is amortized. (dated_date)
coupon_change_indicator	Char	Indicates the issues coupon type, if not fixed. (coupon_change_indicator)
coupon	Float	The current applicable annual interest rate. (coupon)
soft_call_make_whole	Char	A flag indicating the soft call feature contains a make whole payment provision specified in the convertible_additional_terms table. (soft_call_make_whole)
shares_outstanding	Float	The total number of securities/shares of the conversion commodity outstanding (shares_outstanding)
reason	Char	The reason for the change of the convertible information on the change_date. (reason)
qty_of_commod	Float	The quantity of the conversion commodity (qty_of_commod)
percs	Char	A flag indicating the security has maximum payoff upon conversion specified in the convertible_additional_terms table. (percs)
percent_of_outstanding_commod	Float	The percentage of the total conversion commodity available through conversion. (percent_of_outstanding_commod)
peps	Char	A flag indicating the issue has additional conversion terms. (peps)
orig_shares_outstanding	Float	The total number of securities/shares of the conversion commodity when the issue was first offered. (orig_shares_outstanding)
orig_qty_of_commod	Float	The quantity of the conversion commodity that the bondholder would have received when the security was first offered. (orig_qty_of_commod)
orig_percent_outstanding_com	Float	The percentage of the total conversion commodity available through conversion of the issue when the issue was first offered. (orig_percent_outstanding_com)
orig_conv_price	Float	Conversion price of the conversion commodity when the issue was first offered. (orig_conv_price)
orig_conv_premium	Float	The percentage difference between the conversion price and the market price when the issue was first offered. (orig_conv_premium)
orig_commod_price	Float	Market price of the conversion commodity when the issue was first offered. (orig_commod_price)
exchange	Char	A four-letter code indicating the conversion commoditys principal securities exchange. (exchange)
dilution_protection	Char	A flag indicating whether or not the quantity of commodity will be adjusted for stock splits, dividends, distributions, or other specified events. (dilution_protection)
convert_on_call	Char	A flag indicating whether the holder is able to convert their security if it is called for redemption. (convert_on_call)
conv_total_trade_days	Float	Number of trading days during which the conversion commoditys price is measured for lifting redemption restrictions. (conv_total_trade_days)
conv_redemp_exception	Char	Flag indicating whether a soft call provision has been specified. (conv_redemp_exception)
conv_redemp_date	Date	End date of current soft call feature. (conv_redemp_date)
conv_prohibited_from	Date	Start date of current soft call feature. (conv_prohibited_from)
conv_price_percent	Float	Minimum percentage of the applicable conversion price at which the conversion commodity must trade at before restrictions on redemption are lifted. (conv_price_percent)
conv_price	Float	Conversion price of the conversion commodity. (conv_price)
conv_premium	Float	The percentage difference between the conversion price and the market price. (conv_premium)
conv_period_spec	Char	Indicates whether or not the bondholder must submit their security for conversion in a specified time period prior to redemption. (conv_period_spec)
conv_period_days	Float	Number of days prior to the announced redemption date that the bondholders are allowed to convert their bonds. (conv_period_days)
conv_part_trade_days	Float	The number of trading days that the conversion security must trade at or above a certain price to allow redemption. (conv_part_trade_days)
conv_exp_date	Date	Date on which conversion privilege expires. (conv_exp_date)
conv_eff_date	Date	First date on which issue may be converted. (conv_eff_date)
conv_commod_type	Char	A four-letter code indicating what the conversion commodity is. (conv_commod_type)
conv_commod_issuer	Char	The name of the issuer of the exchange commodity. (conv_commod_issuer)
conv_commod_cusip	Char	The CUSIP of the conversion commodity. (conv_commod_cusip)
conv_cash	Float	Additional cash payable by bondholder to effect conversion. (conv_cash)
conditional_conv_terms	Char	A flag indicating the security has conditional conversion terms specified in the convertible_additional_terms table. (conditional_conv_terms)
commod_price	Float	Market price of the conversion commodity. (commod_price)
change_date	Date	The date the current convertible information was added to the issue. (change_date)
as_of_date	Date	The date the current convertible information was added to the issue. (as_of_date)
ticker	Char	The ticker of the conversion commodity. (ticker)
split_ratio	Char	The ratio to which the conversion commodity was split. (split_ratio)
split_date	Date	The date of the last known conversion commodity split. (split_date)
sc_make_whole_start_date	Date	The date from which a soft call make-whole payment becomes payable if the bond is called for redemption. (sc_make_whole_start_date)
sc_make_whole_initial_amount	Float	The initial soft call make-whole payment amount that would be payable if the bond were called for redemption on the soft call make-whole start date. (sc_make_whole_initial_amount)
sc_make_whole_end_date	Date	The final date at which a soft call make-whole payment is payable if the bond is called for redemption. (sc_make_whole_end_date)
sc_make_whole_decrement_type	Char	A four-letter code indicating how the soft call make-whole payment decreases over time when the bond is called for redemption. (sc_make_whole_decrement_type)
sc_make_whole_change_percent	Float	The percentage (of the face value) by which the soft call make-whole payment payable is reduced on an annual basis if the bond is called for redemption. (sc_make_whole_change_percent)
percs_max_payoff	Float	Maximum payoff bondholder will receive upon conversion or at maturity per PERCS. (percs_max_payoff)
peps_min_conversion_ratio	Float	Minimum value of the conversion ratio. (peps_min_conversion_ratio)
peps_max_conversion_ratio	Float	Maximum value of the conversion ratio. (peps_max_conversion_ratio)
peps_lower_price	Float	Conversion commodity price at which maximum conversion ratio applies. (peps_lower_price)
peps_issue_price	Float	The face value or principal amount of the bond. (peps_issue_price)
peps_higher_price	Float	Conversion commodity price at which minimum conversion ratio applies. (peps_higher_price)
coco_trigger_expressed_as	Char	A four-letter code indicating how the coco_initial_trigger_percent is expressed. (coco_trigger_expressed_as)
coco_trade_days_in_previous	Char	The time period in which the coco_trade_days are measured. (coco_trade_days_in_previous)
coco_trade_days	Char	The number of trading days that the conversion commodity must trade at or above the trigger percent. (coco_trade_days)
coco_start_date	Date	Date at which conversion becomes conditional. (coco_start_date)
coco_min_trigger_level	Float	The lowest number the trigger percent will reach. (coco_min_trigger_level)
coco_initial_trigger_percent	Float	Stock price level initially required to trigger conditional conversion. (coco_initial_trigger_percent)
coco_end_date	Date	Last date that the conditional conversion terms apply. (coco_end_date)
coco_change_rate	Float	Annual percentage change of the coco_initial_trigger_percent. (coco_change_rate)
coco_change_frequency	Char	A four-letter code indicating the change frequency of the trigger percent. (coco_change_frequency)
action_type	Char	Action Type (action_type)
action_price	Float	Action Price (action_price)
action_amount	Float	Action Amount (action_amount)
effective_date	Date	Effective Date (effective_date)
amount_outstanding	Float	Amount Outstanding (amount_outstanding)
voting_power_percentage_erp	Float	Voting Power Percentage Erp (voting_power_percentage_erp)
voting_power_percentage	Float	Voting Power Percentage (voting_power_percentage)
rating_decline_trigger_put	Char	Rating Decline Trigger Put (rating_decline_trigger_put)
rating_decline_provision	Char	Rating Decline Provision (rating_decline_provision)
negative_pledge_covenant	Char	Negative Pledge Covenant (negative_pledge_covenant)
legal_defeasance	Char	Legal Defeasance (legal_defeasance)
economic_cov_def	Char	Economic Cov Def (economic_cov_def)
defeasance_wo_tax_conseq	Char	Defeasance w.o. Tax Conseq (defeasance_wo_tax_conseq)
declining_net_worth_trigger	Float	Declining Net Worth Trigger (declining_net_worth_trigger)
declining_net_worth_provisions	Char	Declining Net Worth Provisions (declining_net_worth_provisions)
declining_net_worth_percentage	Float	Declining Net Worth Percentage (declining_net_worth_percentage)
declining_net_worth	Char	Declining Net Worth (declining_net_worth)
cross_default	Char	Cross Default (cross_default)
cross_acceleration	Char	Cross Acceleration (cross_acceleration)
covenant_defeas_wo_tax_conseq	Char	Covenant Defeas w.o. Tax (covenant_defeas_wo_tax_conseq)
change_control_put_provisions	Char	Change Control Put Provisions (change_control_put_provisions)
asset_sale_clause	Char	Asset Sale Clause (asset_sale_clause)
after_acquired_property_clause	Char	After Acquired Property Clause (after_acquired_property_clause)
currency	Char	Currency (currency)
conversion_rate	Float	Conversion Rate (conversion_rate)
amt_offered	Float	Amount Offered (amt_offered)
settlement	Char	Settlement (settlement)
filing_date	Date	Filing Date (filing_date)
sec_cusip	Char	CUSIP of the other security offered as part of the unit. (sec_cusip)
quantity	Float	The quantity of other security offered with the unit. (quantity)
other_sec_type	Char	Type of the other security. (other_sec_type)
other_sec_issuer	Char	The name of the issuer of the other security. (other_sec_issuer)
market_price	Float	The market price of the units other security as of the issues offering date. (market_price)
date_transferable	Date	The date on which each security offered as part of the unit may be sold separately. (date_transferable)
date_subj_adjustment	Char	A flag indicating that the date on which the unit is free to trade as separate security is subject to adjustment. (date_subj_adjustment)
allocated_offering_price_other	Float	Portion of the unit price allocated to the non-debt, non-warrant security in the unit. (allocated_offering_price_other)
overallotment_expiration_date	Date	Overallotment Expiration Date (overallotment_expiration_date)
exercised_date	Date	Exercised Date (exercised_date)
exercised	Char	Exercised (exercised)
amount	Float	Amount (amount)
notification_period	Char	Notification Period (notification_period)
next_put_price	Float	Next Put Price (next_put_price)
next_put_date	Date	Next Put Date (next_put_date)
unit_cusip	Char	The CUSIP of the unit. (unit_cusip)
total_units_offered	Float	Total number of units offered. (total_units_offered)
principal_amt_per_unit	Float	The principal amount of each unit. (principal_amt_per_unit)
allocated_offering_price_unit	Float	From the prospectus, the value of the bond part of the unit. (allocated_offering_price_unit)
parent_id	Float	A number representing the agent_id of this issuers parent company. (parent_id)
naics_code	Char	The North American Industry Classification System code used to describe the business of this issuer. (naics_code)
industry_group	Char	A code indicating the industry group to which the issuer belongs. (industry_group)
industry_code	Char	A code indicating the industry to which the issuer belongs. (industry_code)
in_bankruptcy	Char	A flag denoting that the issuer is currently in bankruptcy. (in_bankruptcy)
esop	Char	A flag denoting that the issue will be repaid by pension contributions to an ESOP. (esop)
country_domicile	Char	The Country of Domicile or country of permanent residence of this issuer. (country_domicile)
agent_id	Float	A unique Mergent-generated number to identify each agent. (agent_id)
sic_code	Char	The firms four-digit Standard Industry Classification code. (sic_code)
transaction_affiliates	Char	Issuer is restricted in certain business dealings with its subsidiaries. (transaction_affiliates)
subordinated_debt_issuance	Char	Restricts issuance of junior or subordinated debt. (subordinated_debt_issuance)
stock_transfer_sale_disp	Char	Restricts the issuer from transferring, selling, or disposing of its own common stock or the common stock of a subsidiary. (stock_transfer_sale_disp)
stock_issuance_issuer	Char	Restricts issuer from issuing additional common stock. (stock_issuance_issuer)
senior_debt_issuance	Char	Restricts issuer to the amount of senior debt it may issue in the future. (senior_debt_issuance)
sales_leaseback_is	Char	Restricts issuer to the type or amount of property used in a sale leaseback transaction and may restrict its use of the proceeds of the sale. (sales_leaseback_is)
sale_assets	Char	Restrictions on the ability of an issuer to sell assets or restrictions on the issuers use of the proceeds from the sale of assets. (sale_assets)
restricted_payments	Char	Restricts issuers freedom to make payments (other than dividend related payments) to shareholders and others. (restricted_payments)
net_earnings_test_issuance	Char	To issue additional debt the issuer must have achieved or maintained certain profitability levels. (net_earnings_test_issuance)
maintenance_net_worth	Char	Issuer must maintain a minimum specified net worth. (maintenance_net_worth)
liens_is	Char	In the case of default, the bondholders have the legal right to sell mortgaged property to satisfy their unpaid obligations. (liens_is)
leverage_test_is	Char	Restricts total-indebtedness of the issuer. (leverage_test_is)
investments	Char	Restricts issuers investment policy to prevent risky investments. (investments)
indebtedness_is	Char	Restricts user from incurring additional debt with limits on absolute dollar amount of debt outstanding or percentage total capital. (indebtedness_is)
funded_debt_is	Char	Restricts issuer from issuing additional funded debt. (funded_debt_is)
fixed_charge_coverage_is	Char	Issuer is required to have a ratio of earnings available for fixed charges, of at least a minimum specified level. (fixed_charge_coverage_is)
dividends_related_payments_is	Char	Payments made to shareholders or other entities may be limited to a certain percentage of net income or some other ratio. (dividends_related_payments_is)
consolidation_merger	Char	A consolidation or merger of the issuer with another entity is restricted. (consolidation_merger)
subsidiary_redesignation	Char	Indicates if restricted subsidiaries may be reclassified as an unrestricted subsidiaries. (subsidiary_redesignation)
subsidiary_guarantee	Char	Subsidiary is restricted from issuing guarantees for the payment of interest and/or principal of certain debt obligations. (subsidiary_guarantee)
stock_issuance	Char	Restricts issuer from issuing additional common stock in restricted subsidiaries. (stock_issuance)
sales_leaseback_sub	Char	Restricts subsidiaries from selling then leasing back assets that provide security for the debtholder. (sales_leaseback_sub)
sale_xfer_assets_unrestricted	Char	Issuer must use proceeds from sale of subsidiaries assets to reduce debt. (sale_xfer_assets_unrestricted)
preferred_stock_issuance	Char	Restricts subsidiaries ability to issue preferred stock. (preferred_stock_issuance)
liens_sub	Char	Restricts subsidiaries from acquiring liens on their property. (liens_sub)
leverage_test_sub	Char	Limits subsidiaries leverage. (leverage_test_sub)
investments_unrestricted_subs	Char	Restricts subsidiaries investments. (investments_unrestricted_subs)
indebtedness_sub	Char	Restricts the total indebtedness of the subsidiaries. (indebtedness_sub)
funded_debt_sub	Char	Restricts issuers subsidiaries from issuing additional funded debt. (funded_debt_sub)
fixed_charge_coverage_sub	Char	Subsidiaries are required to maintain a minimum ratio of net income to fixed charges. (fixed_charge_coverage_sub)
dividends_related_payments_sub	Char	Limits the subsidiaries payment of dividends to a certain percentage of net income or some other ratio. (dividends_related_payments_sub)
borrowing_restricted	Char	Indicates subsidiaries are restricted from borrowing, except from parent. (borrowing_restricted)"""

## Loading TRACE Enhanced Clean Data

In [12]:
trace_df_raw = spark.read.csv("trace_enhanced_clean_data.csv",header=True,inferSchema=False)
trace_df_raw_cols = [x.lower() for x in trace_df_raw.columns]

In [13]:
trace_schema, trace_spark_dtype_dict, trace_wrds_dtype_dict = get_spark_schema(trace_df_raw_cols,
                                                                               trace_enhanced_dtype_str_dirty,
                                                                               force_custom=True)

In [14]:
trace_df = spark.read.csv("trace_enhanced_clean_data.csv",schema=trace_schema,header=True)

In [15]:
trace_spark_dtype_str_dict = spark_dtype_to_str_map(trace_spark_dtype_dict,0)
with open("trace_spark_dtype_dict.json","w") as file:
    json.dump(trace_spark_dtype_str_dict,file)

In [16]:
trace_df.rdd.getNumPartitions()

333

## Loading Mergent Bond Issue Data

In [36]:
mergent_df_raw = spark.read.csv("fisd_bond_issue_data.csv",header=True,inferSchema=False)
mergent_df_raw_cols = [x.lower() for x in mergent_df_raw.columns]

In [37]:
mergent_schema, mergent_spark_dtype_dict, mergent_wrds_dtype_dict = get_spark_schema(mergent_df_raw_cols, 
                                                                                     mergent_bond_issue_dtype_str_dirty,
                                                                                     force_custom=True)

In [19]:
mergent_df = spark.read.csv("fisd_bond_issue_data.csv",schema=mergent_schema,header=True)

In [38]:
mergent_spark_dtype_str_dict = spark_dtype_to_str_map(mergent_spark_dtype_dict,0)
with open("mergent_spark_dtype_dict.json","w") as file:
    json.dump(mergent_spark_dtype_str_dict,file)

In [21]:
mergent_df.rdd.getNumPartitions()

16

## Filters proposed by Binsbergen, et al. (2023)

### Statistics of TRACE features

In [27]:
trace_df.count()

274781255

In [155]:
for col in ["wis_fl", "spcl_trd_fl", "lckd_in_ind"]:
    trace_df.groupBy(col).count().show()

+------+---------+
|wis_fl|    count|
+------+---------+
|     Y|    14374|
|     N|274766881|
+------+---------+

+-----------+---------+
|spcl_trd_fl|    count|
+-----------+---------+
|       NULL|274515574|
|          Y|   265681|
+-----------+---------+

+-----------+---------+
|lckd_in_ind|    count|
+-----------+---------+
|       NULL|269001182|
|          Y|  5780073|
+-----------+---------+



In [159]:
trace_df.select("days_to_sttl_ct").describe().show()

+-------+-------------------+
|summary|    days_to_sttl_ct|
+-------+-------------------+
|  count|           72400787|
|   mean|0.12488955403205769|
| stddev|  0.858996748297732|
|    min|                  0|
|    max|                 60|
+-------+-------------------+



In [15]:
trace_df.select(F.sum(F.col("days_to_sttl_ct").isNull().cast("int"))).collect()[0][0]

202380468

In [21]:
trace_df_raw.select(F.sum(F.col("days_to_sttl_ct").isNull().cast("int"))).collect()[0][0]

202380468

In [18]:
trace_df.select("days_to_sttl_ct").distinct().show(60, truncate=False)

+---------------+
|days_to_sttl_ct|
+---------------+
|31             |
|28             |
|27             |
|26             |
|44             |
|12             |
|22             |
|47             |
|13             |
|6              |
|16             |
|40             |
|20             |
|5              |
|19             |
|41             |
|15             |
|43             |
|9              |
|17             |
|35             |
|4              |
|8              |
|23             |
|39             |
|7              |
|10             |
|45             |
|38             |
|25             |
|24             |
|29             |
|21             |
|32             |
|11             |
|14             |
|42             |
|2              |
|30             |
|0              |
|18             |
|36             |
|52             |
|54             |
|37             |
|33             |
|53             |
|34             |
|48             |
|50             |
|46             |
|60             |
|56       

In [161]:
trace_df.select(F.min("trd_exctn_dt").alias("Earliest Date"),
                F.max("trd_exctn_dt").alias("Latest Date")).show()

+-------------+-----------+
|Earliest Date|Latest Date|
+-------------+-----------+
|   2002-07-01| 2024-03-28|
+-------------+-----------+



In [16]:
trace_df.select(F.sum(F.col("trd_exctn_dt").isNull().cast("int"))).collect()[0][0]

0

In [26]:
trace_df.filter(F.col("stlmnt_dt") != "9999-12-31").select(F.min("stlmnt_dt").alias("Earliest Date"),
                                                           F.max("stlmnt_dt").alias("Latest Date")
                                                          ).show()


+-------------+-----------+
|Earliest Date|Latest Date|
+-------------+-----------+
|   2008-10-07| 5018-04-22|
+-------------+-----------+



In [25]:
trace_df.select(F.min("stlmnt_dt").alias("Earliest Date"),
                F.max("stlmnt_dt").alias("Latest Date")).show()

+-------------+-----------+
|Earliest Date|Latest Date|
+-------------+-----------+
|   2008-10-07| 9999-12-31|
+-------------+-----------+



In [27]:
trace_df.filter(F.col("stlmnt_dt") != "9999-12-31") \
       .select("stlmnt_dt") \
       .distinct() \
       .orderBy(F.desc("stlmnt_dt")) \
       .show(10, False)


+----------+
|stlmnt_dt |
+----------+
|5018-04-22|
|3021-10-12|
|2929-12-21|
|2916-12-28|
|2520-06-05|
|2222-12-19|
|2217-08-18|
|2202-10-23|
|2202-06-02|
|2202-05-22|
+----------+
only showing top 10 rows



In [24]:
trace_df.select(F.sum(F.col("stlmnt_dt").isNull().cast("int"))).collect()[0][0]

72400787

In [57]:
trace_df.groupBy("scrty_type_cd").count().show(60, truncate=False)

+-------------+---------+
|scrty_type_cd|count    |
+-------------+---------+
|E            |1176390  |
|C            |71224397 |
|NULL         |202380468|
+-------------+---------+



In [58]:
trace_df.groupBy("trdg_mkt_cd").count().show(60, truncate=False)

+-----------+---------+
|trdg_mkt_cd|count    |
+-----------+---------+
|P1         |18352267 |
|S1         |256428988|
+-----------+---------+



In [90]:
trace_df.groupBy("dissem_fl").count().show(60, truncate=False)

+---------+---------+
|dissem_fl|count    |
+---------+---------+
|Y        |242753067|
|N        |32028188 |
+---------+---------+



### TRACE Data Filtering

In [22]:
when_issued_filter = F.col("wis_fl") != "Y"
special_filter = (F.col("spcl_trd_fl") != "Y") | F.col("spcl_trd_fl").isNull()
locked_in_filter = (F.col("lckd_in_ind") != "Y") | F.col("lckd_in_ind").isNull()
pre_2012_sub_filter = F.year("trd_exctn_dt") < 2012
days_to_settle_sub_filter = (F.col("days_to_sttl_ct") > 2) & F.col("days_to_sttl_ct").isNotNull()
pre_2012_days_to_settle_filter = ~(pre_2012_sub_filter & days_to_settle_sub_filter)
corporate_bond_filter = ~((F.col("scrty_type_cd")=="E") & F.col("scrty_type_cd").isNotNull())
secondary_market_filter = F.col("trdg_mkt_cd")=="S1"
disseminated_filter = F.col("dissem_fl")=="Y"

In [23]:
trace_df_filtered = trace_df.filter(
    when_issued_filter &
    special_filter &
    locked_in_filter &
    pre_2012_days_to_settle_filter &
    corporate_bond_filter &
    secondary_market_filter &
    disseminated_filter
)

### Statistics of Mergent features

In [28]:
mergent_df.count()

652658

In [31]:
for col in ["country_domicile","asset_backed","convertible","coupon_type","private_placement", "rule_144a", "sec_reg_type1","sec_reg_type2"]:
    mergent_df.groupBy(col).count().show(60, truncate=False)

+----------------+------+
|country_domicile|count |
+----------------+------+
|POL             |39    |
|JAM             |20    |
|BRA             |538   |
|FRA             |3165  |
|URY             |37    |
|ITA             |249   |
|IOT             |2     |
|HRV             |12    |
|BHS             |134   |
|GBR             |77439 |
|ROM             |24    |
|NULL            |1005  |
|BMU             |476   |
|AUS             |1297  |
|MLT             |7     |
|MEX             |629   |
|SVK             |8     |
|HUN             |27    |
|NZL             |91    |
|THA             |52    |
|NOR             |1189  |
|VEN             |72    |
|FIN             |167   |
|PER             |109   |
|NLD             |4138  |
|PAK             |14    |
|LUX             |754   |
|CYM             |2067  |
|TUR             |182   |
|AUT             |174   |
|USA             |495300|
|OMN             |22    |
|PAN             |152   |
|LBR             |35    |
|KOR             |587   |
|ZAF        

In [117]:
mergent_df.select("naics_code","sic_code").describe().show()

+-------+------------------+-----------------+
|summary|        naics_code|         sic_code|
+-------+------------------+-----------------+
|  count|            641191|           644130|
|   mean|308310.54020252934|5948.802502600407|
| stddev|227883.56665440486|843.0790110473203|
|    min|             11119|              100|
|    max|            928110|             9999|
+-------+------------------+-----------------+



In [28]:
mergent_df.select(F.sum(F.col("naics_code").isNull().cast("int"))).collect()[0][0]

11467

In [29]:
mergent_df.select(F.sum(F.col("sic_code").isNull().cast("int"))).collect()[0][0]

8528

In [18]:
mergent_df.groupBy("currency").count().show(60, truncate=False)

+--------+------+
|currency|count |
+--------+------+
|NZD     |65    |
|GBP     |643   |
|CZK     |87    |
|NULL    |645739|
|ARS     |8     |
|CAD     |2104  |
|ZAR     |109   |
|AUD     |121   |
|JPY     |347   |
|FRF     |5     |
|HKD     |31    |
|PHP     |3     |
|MXN     |13    |
|GRD     |20    |
|DEM     |57    |
|ITL     |51    |
|DKK     |19    |
|CHF     |77    |
|RUB     |31    |
|EUR     |2766  |
|PTE     |2     |
|NOK     |90    |
|CLP     |4     |
|LUF     |2     |
|PLN     |89    |
|SKK     |17    |
|SEK     |18    |
|SGD     |16    |
|HUF     |19    |
|NLG     |2     |
|LTL     |2     |
|COP     |5     |
|UYU     |9     |
|TRL     |1     |
|SIT     |1     |
|BRL     |53    |
|USD     |1     |
|EGP     |3     |
|ISK     |1     |
|INR     |5     |
|IDR     |4     |
|CNY     |3     |
|GHC     |1     |
|XEU     |9     |
|DOP     |1     |
|PEN     |1     |
|UZS     |1     |
|JMD     |2     |
+--------+------+



In [19]:
mergent_df.groupBy("denomination").count().show(60, truncate=False)

+------------+-----+
|denomination|count|
+------------+-----+
|25/1        |599  |
|1.254/1.2   |6    |
|100/5       |23204|
|200/1       |3659 |
|5000/5000   |4    |
|100/50      |7    |
|300/1       |5    |
|NULL        |672  |
|150/1       |386  |
|100/25      |1    |
|100/10      |102  |
|50/1        |706  |
|250/50      |52   |
|101/1       |2    |
|8/1         |1    |
|1000/10     |1    |
|10/10       |299  |
|1000/1000   |24   |
|5/5         |2819 |
|100/100     |242  |
|100000/1    |5    |
|1000/1      |35   |
|10/5        |86251|
|.5/.001     |1    |
|5000/100    |4    |
|25/5        |1    |
|15/1        |8    |
|1000/500    |1    |
|50/5        |7    |
|3/1         |3    |
|2/2         |31   |
|110/1       |4    |
|500/1       |41   |
|250/250     |45   |
|10/1        |13487|
|.1/.1       |9    |
|250/5       |34   |
|250/1       |699  |
|5/2         |1    |
|250/10      |18   |
|100/2       |53   |
|.025/.025   |2    |
|100/1       |57253|
|1000/100    |7    |
|2/.25       

In [20]:
mergent_df.groupBy("security_level").count().show(60, truncate=False)

+--------------+------+
|security_level|count |
+--------------+------+
|SEN           |531982|
|NON           |97157 |
|JUNS          |390   |
|SENS          |9212  |
|SS            |10785 |
|SUB           |2629  |
|NULL          |487   |
|JUN           |16    |
+--------------+------+



In [22]:
mergent_df.groupBy("interest_frequency").count().show(60, truncate=False)

+------------------+------+
|interest_frequency|count |
+------------------+------+
|-1                |179   |
|0                 |139372|
|99                |8660  |
|NULL              |1443  |
|1                 |6331  |
|4                 |120817|
|12                |82309 |
|13                |52    |
|2                 |291381|
|14                |1332  |
|15                |758   |
|16                |24    |
+------------------+------+



In [23]:
mergent_df.groupBy("pay_in_kind").count().show(60, truncate=False)

+-----------+------+
|pay_in_kind|count |
+-----------+------+
|Y          |546   |
|N          |441813|
|NULL       |210299|
+-----------+------+



In [24]:
mergent_df.groupBy("day_count_basis").count().show(60, truncate=False)

+---------------+------+
|day_count_basis|count |
+---------------+------+
|30/360         |590664|
|NULL           |4436  |
|ACT/360        |44554 |
|ACT/365        |2889  |
|ACT/ACT        |10088 |
|ACT/366        |27    |
+---------------+------+



In [27]:
mergent_df.groupBy("bond_type").count().show(60, truncate=False)

+---------+------+
|bond_type|count |
+---------+------+
|CMTZ     |114732|
|FGOV     |7277  |
|USNT     |1450  |
|TPCS     |1376  |
|PS       |1571  |
|PSTK     |1999  |
|IIDX     |106   |
|USSI     |1023  |
|CZ       |355   |
|USBL     |1968  |
|CCOV     |6186  |
|CLOC     |2     |
|USSP     |1362  |
|ASPZ     |3935  |
|ADEB     |157266|
|TXMU     |51    |
|USBN     |9594  |
|USBD     |178   |
|USTC     |126   |
|MBS      |77    |
|CMTN     |189969|
|UCID     |499   |
|CPIK     |392   |
|AMTN     |39904 |
|CDEB     |62381 |
|CCUR     |2730  |
|EMTN     |2450  |
|CPAS     |1778  |
|ADNT     |8734  |
|CTBD     |158   |
|BBON     |34    |
|RNT      |26118 |
|FGS      |130   |
|ARNT     |5865  |
|CS       |63    |
|CCPI     |3     |
|CTBL     |752   |
|ABS      |32    |
|EBON     |6     |
|O3Y      |3     |
|O4W      |3     |
|O5Y      |2     |
|O13W     |1     |
|O52W     |1     |
|CP       |15    |
|O26W     |1     |
+---------+------+



In [28]:
mergent_df.select(F.min("offering_date").alias("Earliest Date"),
                F.max("offering_date").alias("Latest Date")).show()

+-------------+-----------+
|Earliest Date|Latest Date|
+-------------+-----------+
|   1894-10-02| 2030-06-29|
+-------------+-----------+



In [29]:
mergent_df.select(F.sum(F.col("offering_date").isNull().cast("int"))).collect()[0][0]

12863

In [30]:
mergent_df.select(F.min("maturity").alias("Earliest Date"),
                F.max("maturity").alias("Latest Date")).show()

+-------------+-----------+
|Earliest Date|Latest Date|
+-------------+-----------+
|   1990-01-01| 2203-07-02|
+-------------+-----------+



In [31]:
mergent_df.select(F.sum(F.col("maturity").isNull().cast("int"))).collect()[0][0]

2368

In [32]:
mergent_df.select(F.min("dated_date").alias("Earliest Date"),
                F.max("dated_date").alias("Latest Date")).show()

+-------------+-----------+
|Earliest Date|Latest Date|
+-------------+-----------+
|   1894-07-01| 2030-11-13|
+-------------+-----------+



In [33]:
mergent_df.select(F.sum(F.col("dated_date").isNull().cast("int"))).collect()[0][0]

17828

In [114]:
mergent_df.groupBy("soft_call_make_whole").count().show(60, truncate=False)

+--------------------+------+
|soft_call_make_whole|count |
+--------------------+------+
|NULL                |607755|
|N                   |44833 |
|Y                   |70    |
+--------------------+------+



In [115]:
mergent_df.groupBy("convert_on_call").count().show(60, truncate=False)

+---------------+------+
|convert_on_call|count |
+---------------+------+
|NULL           |607361|
|Y              |810   |
|N              |44487 |
+---------------+------+



In [116]:
mergent_df.groupBy("conv_redemp_exception").count().show(60, truncate=False)

+---------------------+------+
|conv_redemp_exception|count |
+---------------------+------+
|NULL                 |606329|
|Y                    |1257  |
|N                    |45072 |
+---------------------+------+



In [117]:
mergent_df.groupBy("conv_period_spec").count().show(60, truncate=False)

+----------------+------+
|conv_period_spec|count |
+----------------+------+
|NULL            |606352|
|Y               |1327  |
|N               |44979 |
+----------------+------+



In [121]:
mergent_df.groupBy("percs").count().show(60, truncate=False)

+-----+------+
|percs|count |
+-----+------+
|NULL |607823|
|N    |44830 |
|Y    |5     |
+-----+------+



In [122]:
mergent_df.groupBy("peps").count().show(60, truncate=False)

+----+------+
|peps|count |
+----+------+
|NULL|607731|
|Y   |173   |
|N   |44754 |
+----+------+



In [123]:
mergent_df.groupBy("dilution_protection").count().show(60, truncate=False)

+-------------------+------+
|dilution_protection|count |
+-------------------+------+
|NULL               |586163|
|Y                  |66406 |
|N                  |89    |
+-------------------+------+



In [124]:
mergent_df.groupBy("conditional_conv_terms").count().show(60, truncate=False)

+----------------------+------+
|conditional_conv_terms|count |
+----------------------+------+
|NULL                  |606696|
|N                     |43927 |
|Y                     |2035  |
+----------------------+------+



In [125]:
mergent_df.groupBy("date_subj_adjustment").count().show(60, truncate=False)

+--------------------+------+
|date_subj_adjustment|count |
+--------------------+------+
|NULL                |652628|
|Y                   |11    |
|N                   |19    |
+--------------------+------+



In [126]:
mergent_df.groupBy("esop").count().show(60, truncate=False)

+----+------+
|esop|count |
+----+------+
|NULL|66    |
|Y   |603   |
|N   |651989|
+----+------+



In [127]:
mergent_df.groupBy("greater_of").count().show(60, truncate=False)

+----------+------+
|greater_of|count |
+----------+------+
|NULL      |499979|
|Y         |37    |
|N         |152642|
+----------+------+



In [128]:
mergent_df.groupBy("lesser_of").count().show(60, truncate=False)

+---------+------+
|lesser_of|count |
+---------+------+
|NULL     |499979|
|Y        |11    |
|N        |152668|
+---------+------+



In [144]:
mergent_df.groupBy("announced_call").count().show(60, truncate=False)

+--------------+------+
|announced_call|count |
+--------------+------+
|NULL          |868   |
|N             |651644|
|Y             |146   |
+--------------+------+



In [148]:
mergent_df.groupBy("putable").count().show(60, truncate=False)

+-------+------+
|putable|count |
+-------+------+
|Y      |2984  |
|N      |648506|
|NULL   |1168  |
+-------+------+



In [149]:
mergent_df.groupBy("redeemable").count().show(60, truncate=False)

+----------+------+
|redeemable|count |
+----------+------+
|Y         |389475|
|N         |263108|
|NULL      |75    |
+----------+------+



In [23]:
mergent_df.groupBy("bond_type").count().show(60, truncate=False)

+---------+------+
|bond_type|count |
+---------+------+
|CMTZ     |114732|
|FGOV     |7277  |
|USNT     |1450  |
|TPCS     |1376  |
|PS       |1571  |
|PSTK     |1999  |
|IIDX     |106   |
|USSI     |1023  |
|CZ       |355   |
|USBL     |1968  |
|CCOV     |6186  |
|CLOC     |2     |
|USSP     |1362  |
|ASPZ     |3935  |
|ADEB     |157266|
|TXMU     |51    |
|USBN     |9594  |
|USBD     |178   |
|USTC     |126   |
|MBS      |77    |
|CMTN     |189969|
|UCID     |499   |
|CPIK     |392   |
|AMTN     |39904 |
|CDEB     |62381 |
|CCUR     |2730  |
|EMTN     |2450  |
|CPAS     |1778  |
|ADNT     |8734  |
|CTBD     |158   |
|BBON     |34    |
|RNT      |26118 |
|FGS      |130   |
|ARNT     |5865  |
|CS       |63    |
|CCPI     |3     |
|CTBL     |752   |
|ABS      |32    |
|EBON     |6     |
|O3Y      |3     |
|O4W      |3     |
|O5Y      |2     |
|O13W     |1     |
|O52W     |1     |
|CP       |15    |
|O26W     |1     |
+---------+------+



#### Fixing NAICS cols

In [24]:
def fix_naics_to_int(naics):
    if naics is None:
        return None
    naics_str = str(naics)
    if len(naics_str) < 6:
        naics_str = naics_str.ljust(6, "0")
    return int(naics_str)
    
fix_naics_to_int_udf = udf(fix_naics_to_int, IntegerType())
mergent_df = mergent_df.withColumn("naics_code", fix_naics_to_int_udf(F.col("naics_code")))

mergent_df.select("naics_code").describe().show()

+-------+-----------------+
|summary|       naics_code|
+-------+-----------------+
|  count|           641191|
|   mean|509025.0714623256|
| stddev| 74510.3199017901|
|    min|           111140|
|    max|           928110|
+-------+-----------------+



### Mergent Data Filtering

In [25]:
us_issuer_filter = (F.col("country_domicile") == "USA") & F.col("country_domicile").isNotNull()
rule_144a_filter = F.col("rule_144a") == "N"
#bond_type_filter = F.col("bond_type").isNotNull() & F.col("bond_type").isin(["CDEB", "RNT", "CMTN", "UCID"])
private_placement_filter = (F.col("private_placement")=="N") & F.col("private_placement").isNotNull()
asset_backed_filter = (F.col("asset_backed") == "N") & F.col("asset_backed").isNotNull()
convertible_filter = (F.col("convertible") == "N") & F.col("convertible").isNotNull()
coupon_type_filter = (F.col("coupon_type") == "F") & F.col("coupon_type").isNotNull()
naics_null_filter = F.col("naics_code").isNotNull()
financials_naics_filter = (~F.col("naics_code").between(521000,525990))
utilities_naics_filter = (~F.col("naics_code").between(221000,221399))
public_naics_filter = (~F.col("naics_code").between(920000,929999))
education_naics_filter = (~F.col("naics_code").between(611000, 611999))
nonprofit_naics_filter = (~F.col("naics_code").between(813000, 813999))
senior_debt_filter = (F.col("security_level").isin(["SEN","SS"])) & F.col("security_level").isNotNull()
semi_annual_filter = (F.col("interest_frequency") == "2") & F.col("interest_frequency").isNotNull()
pay_in_kind_filter = ~((F.col("pay_in_kind") == "Y") & F.col("pay_in_kind").isNotNull())
day_count_filter = (F.col("day_count_basis") == "30/360") & F.col("day_count_basis").isNotNull()
#offering_date_filter = F.col("offering_date").between("1972-07-01","2024-03-28") & F.col("offering_date").isNotNull()
#maturity_date_filter = F.col("maturity").between("2002-07-01","2054-03-28") & F.col("maturity").isNotNull()
soft_call_make_whole_filter = ~((F.col("soft_call_make_whole") == "Y") & F.col("soft_call_make_whole").isNotNull())
convert_on_call_filter = ~((F.col("convert_on_call") == "Y") & F.col("convert_on_call").isNotNull())
conv_redemp_exception_filter = ~((F.col("conv_redemp_exception") == "Y") & F.col("conv_redemp_exception").isNotNull())
conv_period_spec_filter = ~((F.col("conv_period_spec") == "Y") & F.col("conv_period_spec").isNotNull())
percs_filter = ~((F.col("percs") == "Y") & F.col("percs").isNotNull())
peps_filter = ~((F.col("peps") == "Y") & F.col("peps").isNotNull())
dilution_protection_filter = ~((F.col("dilution_protection") == "Y") & F.col("dilution_protection").isNotNull())
conditional_conv_terms_filter = ~((F.col("conditional_conv_terms") == "Y") & F.col("conditional_conv_terms").isNotNull())
date_subj_adjustment_filter = ~((F.col("date_subj_adjustment") == "Y") & F.col("date_subj_adjustment").isNotNull())
esop_filter = ~((F.col("esop") == "Y") & F.col("esop").isNotNull()) 
greater_of_filter = ~((F.col("greater_of") == "Y") & F.col("greater_of").isNotNull())
lesser_of_filter = ~((F.col("lesser_of") == "Y") & F.col("lesser_of").isNotNull())
#announced_call_filter = ~((F.col("announced_call") == "Y") & F.col("announced_call").isNotNull())

In [26]:
mergent_df_filtered = mergent_df.filter(
    us_issuer_filter &
    rule_144a_filter &
    private_placement_filter &
    asset_backed_filter &
    convertible_filter &
    coupon_type_filter &
    naics_null_filter &
    financials_naics_filter &
    utilities_naics_filter &
    public_naics_filter &
    education_naics_filter &
    nonprofit_naics_filter &
    senior_debt_filter &
    semi_annual_filter &
    pay_in_kind_filter &
    day_count_filter &
    soft_call_make_whole_filter &
    convert_on_call_filter &
    conv_redemp_exception_filter &
    conv_period_spec_filter &
    percs_filter &
    peps_filter &
    dilution_protection_filter &
    conditional_conv_terms_filter &
    date_subj_adjustment_filter &
    esop_filter &
    greater_of_filter &
    lesser_of_filter
)

## Merging Filtered TRACE and Mergent

In [27]:
merged_trace_and_mergent_df = trace_df_filtered.join(
    mergent_df_filtered,
    trace_df_filtered["cusip_id"] == mergent_df_filtered["complete_cusip"],
    "inner"
)

In [28]:
mergent_cols_identifiers = ["issue_cusip","issuer_cusip", "complete_cusip","issue_id","issuer_id","cusip_name"]

trace_cols_to_keep = ["trd_exctn_dt", "trd_exctn_tm", "trd_rpt_dt", "pr_trd_dt", "stlmnt_dt", 
                      "rptd_pr","yld_pt", "entrd_vol_qt"]


mergent_cols_to_keep = ["coupon", "principal_amt", "dated_date", "first_interest_date", "last_interest_date", 
                        "offering_date", "maturity", "naics_code", "sic_code", "denomination", "bond_type"]

mergent_cols_special = ["defaulted", "redeemable", "putable", "soft_call_make_whole", "announced_call", "greater_of",
                        "lesser_of", "date_subj_adjustment", "esop"]

merged_trace_and_mergent_df_cols_to_keep = mergent_cols_identifiers + trace_cols_to_keep + mergent_cols_to_keep + mergent_cols_special

In [29]:
merged_trace_and_mergent_df = merged_trace_and_mergent_df.select(merged_trace_and_mergent_df_cols_to_keep)

In [30]:
output_dir = "merged_trace_and_mergent_data_temp"
merged_trace_and_mergent_df.repartition(1).write.mode("overwrite").parquet(output_dir)
parquet_files = [file for file in os.listdir(output_dir) if file.endswith(".parquet")]
if parquet_files:
    os.rename(f"{output_dir}/{parquet_files[0]}", "merged_trace_and_mergent_data.parquet")
shutil.rmtree(output_dir)

print("MERGED DATA SAVED AS PARQUET")

MERGED DATA SAVED AS PARQUET


In [65]:
trace_df_filtered.count()

236445895

In [66]:
mergent_df_filtered.count()

20231

In [67]:
len(merged_trace_and_mergent_df.columns)

34

In [68]:
merged_trace_and_mergent_df.count()

94436107

# Historical Duration Adjusted Bond Returns

## Data Preparation

### Obtaining Schema for and Loading Merged Data 

In [39]:
merged_df_temp = spark.read.format("parquet").load("merged_trace_and_mergent_data.parquet")
merged_df_schema, merged_df_cols_by_dataset = get_spark_schema_2(merged_df_temp.columns,True)
del merged_df_temp
merged_df = spark.read.format("parquet").schema(merged_df_schema).load("merged_trace_and_mergent_data.parquet")

In [40]:
merged_df_schema

StructType([StructField('issue_cusip', StringType(), True), StructField('issuer_cusip', StringType(), True), StructField('complete_cusip', StringType(), True), StructField('issue_id', FloatType(), True), StructField('issuer_id', FloatType(), True), StructField('cusip_name', StringType(), True), StructField('trd_exctn_dt', DateType(), True), StructField('trd_exctn_tm', TimestampType(), True), StructField('trd_rpt_dt', DateType(), True), StructField('pr_trd_dt', DateType(), True), StructField('stlmnt_dt', DateType(), True), StructField('rptd_pr', FloatType(), True), StructField('yld_pt', FloatType(), True), StructField('entrd_vol_qt', FloatType(), True), StructField('coupon', FloatType(), True), StructField('principal_amt', FloatType(), True), StructField('dated_date', DateType(), True), StructField('first_interest_date', DateType(), True), StructField('last_interest_date', DateType(), True), StructField('offering_date', DateType(), True), StructField('maturity', DateType(), True), Struc

In [41]:
merged_df_cols_by_dataset

{'trace': ['trd_rpt_dt',
  'trd_exctn_dt',
  'trd_exctn_tm',
  'rptd_pr',
  'entrd_vol_qt',
  'pr_trd_dt',
  'yld_pt',
  'stlmnt_dt'],
 'mergent': ['date_subj_adjustment',
  'first_interest_date',
  'redeemable',
  'issue_id',
  'offering_date',
  'cusip_name',
  'bond_type',
  'greater_of',
  'dated_date',
  'coupon',
  'sic_code',
  'maturity',
  'denomination',
  'issuer_cusip',
  'last_interest_date',
  'naics_code',
  'announced_call',
  'defaulted',
  'soft_call_make_whole',
  'lesser_of',
  'issue_cusip',
  'putable',
  'principal_amt',
  'issuer_id',
  'complete_cusip',
  'esop']}

### Initial Statistics for Features in Merged Data

In [42]:
row_count_1 = merged_df.count()
for col in merged_df.columns:
    temp_null_count = merged_df.select(F.sum(F.col(col).isNull().cast("int"))).collect()[0][0]
    if temp_null_count != 0:
        print(f"Column: {col}, Null Count Ratio: {temp_null_count/row_count_1:.3f}")

Column: pr_trd_dt, Null Count Ratio: 0.995
Column: stlmnt_dt, Null Count Ratio: 0.212
Column: yld_pt, Null Count Ratio: 0.035
Column: last_interest_date, Null Count Ratio: 0.000
Column: offering_date, Null Count Ratio: 0.000
Column: sic_code, Null Count Ratio: 0.000
Column: putable, Null Count Ratio: 0.000
Column: soft_call_make_whole, Null Count Ratio: 1.000
Column: greater_of, Null Count Ratio: 1.000
Column: lesser_of, Null Count Ratio: 1.000
Column: date_subj_adjustment, Null Count Ratio: 1.000
Column: esop, Null Count Ratio: 0.000


In [43]:
initial_merged_df_count = merged_df.count()

In [44]:
merged_df.select("rptd_pr").describe().show()

+-------+------------------+
|summary|           rptd_pr|
+-------+------------------+
|  count|          99243036|
|   mean| 99.97559905017782|
| stddev|13.286501454414692|
|    min|         -0.434444|
|    max|            6347.0|
+-------+------------------+



In [45]:
merged_df.select("principal_amt").describe().show()

+-------+------------------+
|summary|     principal_amt|
+-------+------------------+
|  count|          99243036|
|   mean|1021.7321779535241|
| stddev| 1215.424271214306|
|    min|               0.0|
|    max|          250000.0|
+-------+------------------+



In [46]:
merged_df.select("entrd_vol_qt").describe().show()

+-------+------------------+
|summary|      entrd_vol_qt|
+-------+------------------+
|  count|          99243036|
|   mean|449053.60056237556|
| stddev|2594240.5739374557|
|    min|               0.1|
|    max|           7.445E9|
+-------+------------------+



In [47]:
merged_df.groupBy("denomination").count().show(60, truncate=False)

+------------+--------+
|denomination|count   |
+------------+--------+
|25/1        |12822   |
|200/1       |43160   |
|100/50      |265     |
|50/1        |11871   |
|10/10       |5781    |
|5/5         |17024   |
|100/100     |852     |
|10/1        |104873  |
|2000/10     |531     |
|250/1       |3572    |
|100/1       |566332  |
|75/1        |1804    |
|2/1         |69005164|
|5/1         |339719  |
|1/1         |29129158|
|`           |108     |
+------------+--------+



### Additional Filters

In [48]:
# remove rows will null values for important cols
rptd_pr_null_filter = F.col("rptd_pr").isNotNull()
yld_pt_null_filter = F.col("yld_pt").isNotNull()
entrd_vol_qty_null_filter = F.col("entrd_vol_qt").isNotNull()
last_interest_date_null_filter = F.col("last_interest_date").isNotNull()

# remove rows with strange price values (not in percentage terms of face value)
negative_price_filter = F.col("rptd_pr") > 0

# extreme value filters
#price_q1, price_q99 = merged_df.approxQuantile("rptd_pr", [0.00, 0.90], 0.05)
#price_filter = F.col("rptd_pr").between(price_q1, price_q99)

#principal_q1, principal_q99 = merged_df.approxQuantile("principal_amt", [0.10, 0.90], 0.05)
#principal_filter = F.col("principal_amt").between(principal_q1, principal_q99)

#volume_q1, volume_q99 = merged_df.approxQuantile("entrd_vol_qt", [0.00, 0.90], 0.05)
#volume_filter = F.col("entrd_vol_qt").between(volume_q1, volume_q99)

#minimum volume filter (>10000)
min_volume_filter = F.col("entrd_vol_qt") > 10000

# remove rows where days to settlement is greater than 2
days_to_settle_filter_2 = ~((F.col("trd_exctn_dt").isNotNull()) &\
                         (F.col("stlmnt_dt").isNotNull()) &\
                         (F.datediff(F.col("stlmnt_dt"),F.col("trd_exctn_dt")) > 2) &\
                         (F.year(F.col("stlmnt_dt"))<2012))


# remove rows where traded notional is greater than $10,000
traded_notional_filter = (F.col("entrd_vol_qt") * (F.col("rptd_pr")/100) * F.col("principal_amt")) > 10000

#remove rows where years to maturity is not between 1 - 30 years (using 30/360 daycount basis)
years_to_maturity = (((F.year(F.col("maturity")) - F.year(F.col("trd_exctn_dt"))) * 360 + \
                    (F.month(F.col("maturity")) - F.month(F.col("trd_exctn_dt"))) * 30 + \
                    (F.dayofmonth(F.col("maturity")) - F.dayofmonth(F.col("trd_exctn_dt")))) / 360)
years_to_maturity_filter = years_to_maturity.between(1,30)

In [49]:
merged_df_2 = merged_df.filter(
    rptd_pr_null_filter &
    yld_pt_null_filter &
    entrd_vol_qty_null_filter &
    last_interest_date_null_filter &
    negative_price_filter &
    days_to_settle_filter_2 &
    years_to_maturity_filter &
    min_volume_filter
    #price_filter &
    #principal_filter &
    #volume_filter
)

In [50]:
merged_df_2.select("entrd_vol_qt").describe().show()

+-------+------------------+
|summary|      entrd_vol_qt|
+-------+------------------+
|  count|          60351669|
|   mean| 638207.8609670926|
| stddev|3053530.8489947924|
|    min|           10002.0|
|    max|           7.445E9|
+-------+------------------+



In [51]:
merged_df_2.select("principal_amt").describe().show()

+-------+------------------+
|summary|     principal_amt|
+-------+------------------+
|  count|          60351669|
|   mean|1018.2700878578851|
| stddev|1293.6017746361094|
|    min|               0.0|
|    max|          250000.0|
+-------+------------------+



In [52]:
merged_df_2.select("rptd_pr").describe().show()

+-------+------------------+
|summary|           rptd_pr|
+-------+------------------+
|  count|          60351669|
|   mean| 100.3394766528903|
| stddev|12.612910353369179|
|    min|            1.0E-6|
|    max|            6347.0|
+-------+------------------+



In [53]:
row_count_1 = merged_df_2.count()
for col in merged_df.columns:
    temp_null_count = merged_df_2.select(F.sum(F.col(col).isNull().cast("int"))).collect()[0][0]
    if temp_null_count != 0:
        print(f"Column: {col}, Null Count Ratio: {temp_null_count/row_count_1:.3f}")

Column: pr_trd_dt, Null Count Ratio: 0.994
Column: stlmnt_dt, Null Count Ratio: 0.217
Column: offering_date, Null Count Ratio: 0.000
Column: sic_code, Null Count Ratio: 0.000
Column: putable, Null Count Ratio: 0.000
Column: soft_call_make_whole, Null Count Ratio: 1.000
Column: greater_of, Null Count Ratio: 1.000
Column: lesser_of, Null Count Ratio: 1.000
Column: date_subj_adjustment, Null Count Ratio: 1.000
Column: esop, Null Count Ratio: 0.000


In [54]:
portion_of_merged_df_retained = merged_df_2.count() / initial_merged_df_count
portion_of_merged_df_retained

0.6081199390151668

In [55]:
estimated_transactions_per_day = merged_df_2.count()/(22*252)
int(estimated_transactions_per_day)

10885

### Saving Filtered Merged DF

In [56]:
output_dir = "merged_trace_and_mergent_data_temp"
merged_df_2.repartition(1).write.mode("overwrite").parquet(output_dir)
parquet_files = [file for file in os.listdir(output_dir) if file.endswith(".parquet")]
if parquet_files:
    os.rename(f"{output_dir}/{parquet_files[0]}", "total_bond_transaction_data.parquet")
shutil.rmtree(output_dir)

print("MERGED DATA SAVED AS PARQUET")

MERGED DATA SAVED AS PARQUET


## Monthly Bond Return Calculation

### Daily Price Calculation

#### Loading Filtered Merged DF

In [57]:
trade_df_temp = spark.read.format("parquet").load("total_bond_transaction_data.parquet")
trade_df_schema, trade_df_cols_by_dataset = get_spark_schema_2(trade_df_temp.columns,True)
del trade_df_temp
trade_df = spark.read.format("parquet").schema(trade_df_schema).load("total_bond_transaction_data.parquet")

In [58]:
trade_df_cols_by_dataset

{'trace': ['trd_rpt_dt',
  'trd_exctn_dt',
  'trd_exctn_tm',
  'rptd_pr',
  'entrd_vol_qt',
  'pr_trd_dt',
  'yld_pt',
  'stlmnt_dt'],
 'mergent': ['date_subj_adjustment',
  'first_interest_date',
  'redeemable',
  'issue_id',
  'offering_date',
  'cusip_name',
  'bond_type',
  'greater_of',
  'dated_date',
  'coupon',
  'sic_code',
  'maturity',
  'denomination',
  'issuer_cusip',
  'last_interest_date',
  'naics_code',
  'announced_call',
  'defaulted',
  'soft_call_make_whole',
  'lesser_of',
  'issue_cusip',
  'putable',
  'principal_amt',
  'issuer_id',
  'complete_cusip',
  'esop']}

#### Daily Aggregation

In [59]:
agg_dict_1 = {}
agg_dict_1["daily_price"] = F.sum(F.col("rptd_pr") * F.col("entrd_vol_qt")) / F.sum(F.col("entrd_vol_qt"))
agg_dict_1["daily_volume"] = F.sum(F.col("entrd_vol_qt"))
agg_dict_1["trace_yield"] = F.first(F.col("yld_pt"))
for col in list(set(trade_df_cols_by_dataset["mergent"]) - set(["complete_cusip"])):
    agg_dict_1[col] = F.first(F.col(col))
daily_trade_df = trade_df.groupBy("complete_cusip","trd_exctn_dt").agg(
    *[func.alias(col) for col,func in agg_dict_1.items()]
)

#### Saving Daily Trade Data

In [60]:
output_dir = "daily_trade_data_temp"
daily_trade_df.repartition(1).write.mode("overwrite").parquet(output_dir)
parquet_files = [file for file in os.listdir(output_dir) if file.endswith(".parquet")]
if parquet_files:
    os.rename(f"{output_dir}/{parquet_files[0]}", "daily_bond_trade_data.parquet")
shutil.rmtree(output_dir)

print("DAILY DATA SAVED AS PARQUET")

DAILY DATA SAVED AS PARQUET


### Monthly Price Calculation

#### ReLoading Daily Price Data (Pandas)

In [61]:
daily_trade_df = pd.read_parquet("daily_bond_trade_data.parquet")

In [62]:
daily_trade_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10327114 entries, 0 to 10327113
Data columns (total 30 columns):
 #   Column                Dtype  
---  ------                -----  
 0   complete_cusip        object 
 1   trd_exctn_dt          object 
 2   daily_price           float64
 3   daily_volume          float64
 4   trace_yield           float32
 5   date_subj_adjustment  object 
 6   maturity              object 
 7   first_interest_date   object 
 8   denomination          object 
 9   issuer_cusip          object 
 10  last_interest_date    object 
 11  naics_code            int32  
 12  announced_call        object 
 13  defaulted             object 
 14  redeemable            object 
 15  issue_id              float32
 16  offering_date         object 
 17  soft_call_make_whole  object 
 18  cusip_name            object 
 19  lesser_of             object 
 20  bond_type             object 
 21  greater_of            object 
 22  dated_date            object 
 23  coupo

#### Adjusting Dtypes

In [63]:
daily_trade_df_pandas_dtypes = {
    "complete_cusip": "string",
    "trd_exctn_dt": "datetime64[ns]",
    "daily_price": "float64",
    "daily_volume": "float64",
    "trace_yield": "float64",
    "esop": "string",
    "last_interest_date": "datetime64[ns]",
    "principal_amt": "float32",
    "offering_date": "datetime64[ns]",
    "announced_call": "string",
    "issuer_cusip": "string",
    "denomination": "string",
    "bond_type": "string",
    "date_subj_adjustment": "string",
    "naics_code": "int32",
    "putable": "string",
    "defaulted": "string",
    "soft_call_make_whole": "string",
    "maturity": "datetime64[ns]",
    "cusip_name": "string",
    "issuer_id": "float32",
    "dated_date": "datetime64[ns]",
    "lesser_of": "string",
    "greater_of": "string",
    "coupon": "float32",
    "issue_cusip": "string",
    "first_interest_date": "datetime64[ns]",
    "issue_id": "float32",
    "redeemable": "string",
}

In [64]:
for col, dtype in daily_trade_df_pandas_dtypes.items():
    print(col)
    daily_trade_df[col] = daily_trade_df[col].astype(dtype)

complete_cusip
trd_exctn_dt
daily_price
daily_volume
trace_yield
esop
last_interest_date
principal_amt
offering_date
announced_call
issuer_cusip
denomination
bond_type
date_subj_adjustment
naics_code
putable
defaulted
soft_call_make_whole
maturity
cusip_name
issuer_id
dated_date
lesser_of
greater_of
coupon
issue_cusip
first_interest_date
issue_id
redeemable


#### Monthly Aggregation 1 - Daily to Monthly

In [65]:
def get_holiday_list(year):
    if year > 2020:
        market_holidays = [
            f"{year}-01-01", f"{year}-01-15", f"{year}-02-19", f"{year}-05-27",
            f"{year}-06-19", f"{year}-07-04", f"{year}-09-02", f"{year}-11-28", f"{year}-12-25"
        ]
    else:
        market_holidays = [
            f"{year}-01-01", f"{year}-01-15", f"{year}-02-19", f"{year}-05-27",
            f"{year}-07-04", f"{year}-09-02", f"{year}-11-28", f"{year}-12-25"
        ]
    return market_holidays

In [66]:
holiday_dict = {year: get_holiday_list(year) for year in range(2002,2025)}
holiday_list = [*holiday_dict[2002], *holiday_dict[2003], *holiday_dict[2004],
                *holiday_dict[2005], *holiday_dict[2006], *holiday_dict[2007],
                *holiday_dict[2008], *holiday_dict[2009], *holiday_dict[2010],
                *holiday_dict[2011], *holiday_dict[2012], *holiday_dict[2013],
                *holiday_dict[2014], *holiday_dict[2015], *holiday_dict[2016],
                *holiday_dict[2017], *holiday_dict[2018], *holiday_dict[2019],
                *holiday_dict[2020], *holiday_dict[2021], *holiday_dict[2022],
                *holiday_dict[2023], *holiday_dict[2024]
               ]
trading_bdays = CustomBusinessDay(holidays=holiday_list)

In [67]:
daily_trade_df["month_end_date"] = daily_trade_df["trd_exctn_dt"] + MonthEnd(0)
valid_trade_df_bdays = pd.bdate_range(start=daily_trade_df["month_end_date"].min(),
                                      end=daily_trade_df["month_end_date"].max(),
                                      freq=trading_bdays
                                     )
non_trade_df_bdays_mask = ~daily_trade_df["month_end_date"].isin(valid_trade_df_bdays)
daily_trade_df.loc[non_trade_df_bdays_mask, "month_end_date"] = daily_trade_df.loc[non_trade_df_bdays_mask, "month_end_date"].apply(trading_bdays.rollback)

In [68]:
daily_trade_df_columns = daily_trade_df.columns
daily_trade_df_apply_first_agg_cols = list(set(daily_trade_df_columns) - set(["trd_exctn_dt","daily_price", "daily_volume","trace_yield", "month_end_date","complete_cusip"]))

In [69]:
from functools import partial

def monthly_agg_1(group, apply_first_cols):

    first_row = group.iloc[0]
    last_row = group.iloc[-1]
    
    end_of_month_bday = first_row["month_end_date"]
    start_of_month_bday = trading_bdays.rollforward(end_of_month_bday - MonthBegin(1))
    
    last_5_bdays = frozenset(pd.bdate_range(end=end_of_month_bday, periods=5, freq=trading_bdays))
    first_5_bdays = frozenset(pd.bdate_range(start=start_of_month_bday, periods=5, freq=trading_bdays))
    
    return_dict = dict(
        price_last_5_bdays = np.nan,
        volume_last_5_bdays = np.nan,
        trace_yield_last_5_bdays = np.nan,
        trd_exctn_dt_last_5_bdays = np.nan,
        price_first_5_bdays = np.nan,
        volume_first_5_bdays = np.nan,
        trace_yield_first_5_bdays = np.nan,
        trd_exctn_dt_first_5_bdays = np.nan,
    )

    if last_row["trd_exctn_dt"] in last_5_bdays:
        return_dict["price_last_5_bdays"] = last_row["daily_price"]
        return_dict["volume_last_5_bdays"] = last_row["daily_volume"]
        return_dict["trace_yield_last_5_bdays"] = last_row["trace_yield"]
        return_dict["trd_exctn_dt_last_5_bdays"] = last_row["trd_exctn_dt"]
        
    if first_row["trd_exctn_dt"] in first_5_bdays:
        return_dict["price_first_5_bdays"] = first_row["daily_price"]
        return_dict["volume_first_5_bdays"] = first_row["daily_volume"]
        return_dict["trace_yield_first_5_bdays"] = last_row["trace_yield"]
        return_dict["trd_exctn_dt_first_5_bdays"] = first_row["trd_exctn_dt"]

    return_dict.update(first_row[apply_first_cols].to_dict())

    return pd.Series(return_dict)

monthly_agg_1_with_cols = partial(monthly_agg_1, apply_first_cols=daily_trade_df_apply_first_agg_cols)

In [70]:
monthly_trade_df = daily_trade_df.groupby(["complete_cusip", "month_end_date"]).apply(
    monthly_agg_1_with_cols
).reset_index()

  monthly_trade_df = daily_trade_df.groupby(["complete_cusip", "month_end_date"]).apply(


#### Saving Monthly Aggregation 1

In [71]:
monthly_trade_df.to_parquet("monthly_bond_trade_data_1.parquet", index=False)

#### ReLoading Monthly Price Data 1

In [67]:
monthly_trade_df = pd.read_parquet("monthly_bond_trade_data_1.parquet")

#### Monthly Aggregation 2 & 3 - Choosing Representative Month End Price

In [73]:
monthly_trade_df_cols = list(monthly_trade_df.columns)

In [74]:
monthly_trade_df_cols

['complete_cusip',
 'month_end_date',
 'price_last_5_bdays',
 'volume_last_5_bdays',
 'trace_yield_last_5_bdays',
 'trd_exctn_dt_last_5_bdays',
 'price_first_5_bdays',
 'volume_first_5_bdays',
 'trace_yield_first_5_bdays',
 'trd_exctn_dt_first_5_bdays',
 'date_subj_adjustment',
 'maturity',
 'first_interest_date',
 'denomination',
 'issuer_cusip',
 'last_interest_date',
 'naics_code',
 'announced_call',
 'defaulted',
 'redeemable',
 'issue_id',
 'offering_date',
 'soft_call_make_whole',
 'cusip_name',
 'lesser_of',
 'bond_type',
 'greater_of',
 'dated_date',
 'coupon',
 'issue_cusip',
 'putable',
 'principal_amt',
 'issuer_id',
 'sic_code',
 'esop']

In [75]:
monthly_trade_df_nonstatic_cols_1 = frozenset(["price_last_5_bdays", "volume_last_5_bdays", "trace_yield_last_5_bdays", "trd_exctn_dt_last_5_bdays",
                                               "price_first_5_bdays", "volume_first_5_bdays", "trace_yield_first_5_bdays", "trd_exctn_dt_first_5_bdays",
                                              ])
monthly_trade_df_nonstatic_cols_2 = frozenset(["price_last_5_bdays", "volume_last_5_bdays", "trace_yield_last_5_bdays", "trd_exctn_dt_last_5_bdays",
                                               "price_first_5_bdays", "volume_first_5_bdays", "trace_yield_first_5_bdays", "trd_exctn_dt_first_5_bdays",
                                               "complete_cusip", "month_end_date"
                                              ])
monthly_trade_df_static_cols_1 = [x for x in monthly_trade_df_cols if x not in monthly_trade_df_nonstatic_cols_1]
monthly_trade_df_static_cols_2 = [x for x in monthly_trade_df_cols if x not in monthly_trade_df_nonstatic_cols_2]

In [76]:
#groupby complete_cusip only
def monthly_agg_2(group, apply_ffill_cols):
    
    start_date = group["month_end_date"].iloc[0]
    end_date = group["month_end_date"].iloc[-1]
    group = group.set_index("month_end_date")
    full_date_index = pd.DatetimeIndex(pd.Series(pd.date_range(start=start_date, 
                                                               end=end_date, 
                                                               freq="ME")).apply(trading_bdays.rollback))
    group_reindexed = group.reindex(full_date_index)
    group_reindexed = group_reindexed.reset_index().rename(columns={"index":"month_end_date"})
    group_reindexed.loc[:, apply_ffill_cols] = group_reindexed[apply_ffill_cols].fillna(method="ffill")
    
    group_reindexed.loc[:,"price_first_5_bdays"] = group_reindexed["price_first_5_bdays"].shift(-1)
    group_reindexed.loc[:,"volume_first_5_bdays"] = group_reindexed["volume_first_5_bdays"].shift(-1)
    group_reindexed.loc[:,"trace_yield_first_5_bdays"] = group_reindexed["trace_yield_first_5_bdays"].shift(-1)
    group_reindexed.loc[:,"trd_exctn_dt_first_5_bdays"] = group_reindexed["trd_exctn_dt_first_5_bdays"].shift(-1)
    
    return group_reindexed

monthly_agg_2_with_cols = partial(monthly_agg_2, apply_ffill_cols=monthly_trade_df_static_cols_1)

#groupby complete_cusip and month_end_date
def monthly_agg_3(group, apply_first_cols):

    group_row = group.iloc[0]
    return_dict = dict(
        price = np.nan,
        volume = np.nan,
        trace_yield = np.nan,
        actual_exctn_dt = np.nan
    )
    
    month_end_date = group_row["month_end_date"]
    next_month_end_date = month_end_date + pd.DateOffset(months=1)
    next_month_exctn_dt = group_row["trd_exctn_dt_first_5_bdays"]
    curr_month_exctn_dt = group_row["trd_exctn_dt_last_5_bdays"]
    
    if curr_month_exctn_dt != np.nan:
        
        return_dict["price"] = group_row["price_last_5_bdays"]
        return_dict["volume"] = group_row["volume_last_5_bdays"]
        return_dict["trace_yield"] = group_row["trace_yield_last_5_bdays"]
        return_dict["actual_exctn_dt"] = group_row["trd_exctn_dt_last_5_bdays"]
        
    else:
        
        if next_month_exctn_dt != np.nan:
            
            next_month_exctn_dt_is_subsequent = ((next_month_end_date.month == next_month_exctn_dt.month) & 
                                                 (next_month_end_date.year == next_month_exctn_dt.year))
            if next_month_exctn_dt_is_subsequent:
                
                return_dict["price"] = group_row["price_first_5_bdays"]
                return_dict["volume"] = group_row["volume_first_5_bdays"]
                return_dict["trace_yield"] = group_row["trace_yield_first_5_bdays"]
                return_dict["actual_exctn_dt"] = group_row["trd_exctn_dt_first_5_bdays"]

    return_dict.update(group_row[apply_first_cols].to_dict())

    return pd.Series(return_dict)

monthly_agg_3_with_cols = partial(monthly_agg_3, apply_first_cols=monthly_trade_df_static_cols_2)
        

In [77]:
monthly_trade_df_2 = monthly_trade_df.groupby("complete_cusip").apply(
    monthly_agg_2_with_cols
).reset_index(drop=True)

  monthly_trade_df_2 = monthly_trade_df.groupby("complete_cusip").apply(


In [78]:
monthly_trade_df_3 = monthly_trade_df_2.groupby(["complete_cusip", "month_end_date"]).apply(
    monthly_agg_3_with_cols
).reset_index()

  monthly_trade_df_3 = monthly_trade_df_2.groupby(["complete_cusip", "month_end_date"]).apply(


#### Saving Monthly Aggregation 2 & 3

In [79]:
monthly_trade_df_3.to_parquet("monthly_bond_trade_data_2.parquet", index=False)

In [80]:
monthly_trade_df_2.isna().sum()

month_end_date                     0
complete_cusip                     0
price_last_5_bdays            743116
volume_last_5_bdays           743116
trace_yield_last_5_bdays      743116
trd_exctn_dt_last_5_bdays     743116
price_first_5_bdays           735588
volume_first_5_bdays          735588
trace_yield_first_5_bdays     735588
trd_exctn_dt_first_5_bdays    735588
date_subj_adjustment          994123
maturity                           0
first_interest_date                0
denomination                       0
issuer_cusip                       0
last_interest_date                 0
naics_code                         0
announced_call                     0
defaulted                          0
redeemable                         0
issue_id                           0
offering_date                    109
soft_call_make_whole          994062
cusip_name                         0
lesser_of                     994047
bond_type                          0
greater_of                    994047
d

In [81]:
monthly_trade_df_3.isna().sum()

complete_cusip               0
month_end_date               0
price                   743116
volume                  743116
trace_yield             743116
actual_exctn_dt         743116
date_subj_adjustment    994123
maturity                     0
first_interest_date          0
denomination                 0
issuer_cusip                 0
last_interest_date           0
naics_code                   0
announced_call               0
defaulted                    0
redeemable                   0
issue_id                     0
offering_date              109
soft_call_make_whole    994062
cusip_name                   0
lesser_of               994047
bond_type                    0
greater_of              994047
dated_date                   0
coupon                       0
issue_cusip                  0
putable                     23
principal_amt                0
issuer_id                    0
sic_code                   275
esop                       819
dtype: int64

### Monthly Dirty Price Calculation (Accrued Interest + Coupon)

#### ReLoading Monthly Price Data 2

In [69]:
monthly_df = pd.read_parquet("monthly_bond_trade_data_2.parquet")

In [26]:
def get_holiday_list(year):
    if year > 2020:
        market_holidays = [
            f"{year}-01-01", f"{year}-01-15", f"{year}-02-19", f"{year}-05-27",
            f"{year}-06-19", f"{year}-07-04", f"{year}-09-02", f"{year}-11-28", f"{year}-12-25"
        ]
    else:
        market_holidays = [
            f"{year}-01-01", f"{year}-01-15", f"{year}-02-19", f"{year}-05-27",
            f"{year}-07-04", f"{year}-09-02", f"{year}-11-28", f"{year}-12-25"
        ]
    return market_holidays

holiday_dict = {year: get_holiday_list(year) for year in range(2002,2025)}
holiday_list = [*holiday_dict[2002], *holiday_dict[2003], *holiday_dict[2004],
                *holiday_dict[2005], *holiday_dict[2006], *holiday_dict[2007],
                *holiday_dict[2008], *holiday_dict[2009], *holiday_dict[2010],
                *holiday_dict[2011], *holiday_dict[2012], *holiday_dict[2013],
                *holiday_dict[2014], *holiday_dict[2015], *holiday_dict[2016],
                *holiday_dict[2017], *holiday_dict[2018], *holiday_dict[2019],
                *holiday_dict[2020], *holiday_dict[2021], *holiday_dict[2022],
                *holiday_dict[2023], *holiday_dict[2024]
               ]
trading_bdays = CustomBusinessDay(holidays=holiday_list)

#### Accrued Interest and Coupon Calculation

In [71]:
def accrued_interest_calculation(row, periodicity, asof_date_col):
    
    days_per_period_dict = {
        2: 180,
        4: 90,
        6: 60,
        12: 30
    }
    
    acc_int = 0
    
    if (row[asof_date_col] > row["dated_date"]) and (row[asof_date_col] <= row["maturity"]):
        
        year_days_elapsed = (row[asof_date_col].year - row["dated_date"].year) * 360
        month_days_elapsed = (row[asof_date_col].month - row["dated_date"].month) * 30
        day_days_elapsed = (row[asof_date_col].day - row["dated_date"].day)
        prds_elapsed = (year_days_elapsed + month_days_elapsed + day_days_elapsed) / days_per_period_dict[periodicity]
        whole_prds_elapsed = int(prds_elapsed)
        frac_prds_elapsed = prds_elapsed - whole_prds_elapsed
        acc_int = (row["coupon"]/(periodicity*100)) * row["principal_amt"]
        
        if frac_prds_elapsed != 0:
            
            days_since_last_pymnt = int(frac_prds_elapsed * days_per_period_dict[periodicity])
            acc_int *= days_since_last_pymnt / days_per_period_dict[periodicity]
            
    return acc_int
            

In [72]:
monthly_df.loc[:,"clean_price"] = monthly_df["price"] * monthly_df["principal_amt"] / 100

In [73]:
monthly_df.loc[:,"accrued_interest"] = monthly_df.apply(
    accrued_interest_calculation,
    args=(2,"month_end_date"),
    axis=1
)

In [74]:
monthly_df["accrued_interest"].describe()

count    994123.000000
mean         16.356254
std         102.973884
min           0.000000
25%           5.921528
50%          12.266667
75%          20.519444
max       10242.187500
Name: accrued_interest, dtype: float64

#### Dirty Price Calculation

In [75]:
monthly_df.loc[:, "dirty_price"] = monthly_df["clean_price"] + monthly_df["accrued_interest"]

In [76]:
monthly_df["bond_type"].value_counts()

bond_type
CDEB    891309
RNT      52248
CMTN     47675
UCID      1826
FGOV       500
CCOV       244
ABS        146
CPAS        96
USBN        49
ASPZ        30
Name: count, dtype: int64

In [77]:
monthly_df["volume"].describe()

count    2.510070e+05
mean     3.152631e+06
std      1.001706e+07
min      1.038450e+04
25%      1.000000e+05
50%      4.830000e+05
75%      2.270000e+06
max      6.668040e+08
Name: volume, dtype: float64

In [78]:
monthly_df[(monthly_df["principal_amt"]!=1000)]["dirty_price"].describe()

count       708.000000
mean      37423.542988
std       72005.170669
min           0.000000
25%        2069.415905
50%        4093.123994
75%        6441.844783
max      322994.791667
Name: dirty_price, dtype: float64

In [79]:
monthly_df[(monthly_df["principal_amt"]==1000)]["dirty_price"].describe()

count    250299.000000
mean       1046.865098
std         144.052615
min           3.224028
25%         991.454623
50%        1042.061388
75%        1108.718387
max        9757.737089
Name: dirty_price, dtype: float64

#### Interpolating Dirty Price by Time

In [143]:
def dirtyPriceInterpolation(group):
    
    group.loc[:,"dirty_price"] = group.set_index("month_end_date")["dirty_price"].interpolate(method="time").values


    return group

In [144]:
interp_monthly_df = monthly_df.groupby("complete_cusip", group_keys=False).apply(
    dirtyPriceInterpolation
)

  interp_monthly_df = monthly_df.groupby("complete_cusip", group_keys=False).apply(


#### Bond Return Calculation

In [145]:
interp_monthly_df["bond_return"] = interp_monthly_df.groupby("complete_cusip")["dirty_price"].transform(
    lambda col: (col / col.shift()) - 1
)

#### Bond Type and Face Value Filtering

In [147]:
desired_bond_type_filter = interp_monthly_df["bond_type"].isin(["CDEB", "RNT", "CMTN", "UCID"])
face_value_filter = interp_monthly_df["principal_amt"] == 1000

In [148]:
filt1_monthly_df = interp_monthly_df[desired_bond_type_filter & face_value_filter].reset_index(drop=True)

In [149]:
len(filt1_monthly_df) / len(interp_monthly_df)

0.9956796090624601

In [150]:
avg_months_per_bond = int(filt1_monthly_df[filt1_monthly_df["dirty_price"].notna()].groupby("complete_cusip")["dirty_price"].count().max())
avg_months_per_bond

260

In [151]:
filt1_monthly_df["dirty_price"].describe()

count    941581.000000
mean       1054.947253
std         148.142736
min           3.224028
25%         994.337629
50%        1047.289275
75%        1119.968889
max        9757.737089
Name: dirty_price, dtype: float64

In [152]:
filt1_monthly_df["bond_return"].describe()

count    926538.000000
mean         -0.000739
std           0.023069
min          -0.923953
25%          -0.004694
50%           0.000000
75%           0.004313
max           1.373822
Name: bond_return, dtype: float64

#### Bullet Bond Only DF Filtering

In [153]:
filt2_redeemable_filter = filt1_monthly_df["redeemable"] == "N"
filt2_putable_filter = filt1_monthly_df["putable"] == "N"

In [154]:
filt2_monthly_df = filt1_monthly_df[(filt2_redeemable_filter &
                                     filt2_putable_filter)].reset_index(drop=True)

#### Saving Bullet Bond Only DF

In [155]:
filt2_monthly_df.to_csv("bullet_monthly_df.csv", index=False)

#### Bonds w/ Option  Only DF Filtering

In [156]:
filt3_monthly_df = filt1_monthly_df[(~filt2_redeemable_filter &
                                     ~filt2_putable_filter)].reset_index(drop=True)

#### Saving Bond w/ Option Only DF

In [157]:
filt3_monthly_df.to_csv("option_monthly_df.csv", index=False)

### Monthly Yield Calculation - Bullet Bonds

#### Bullet Bond Pricer for Yield Extraction - Overestimates Dirty Price (even at intraday transaction level)

In [99]:
def bulletBondPricer(ytm, coupon, periodicity, face_value, asof_date, maturity_date):
    
    days_per_period_dict = {
        2: 180,
        4: 90,
        6: 60,
        12: 30
    }
    
    #coupon divided by 100 to get percentage terms, then by 2 to get semi annual figure
    cpn = coupon / (periodicity * 100)
    yld = ytm / (periodicity * 100)
    price = 0
    
    #periods left calculated using 30/360 day count basis based on asof_date and maturity_date
    years_remaining = maturity_date.year - asof_date.year
    year_days_remaining = (years_remaining) * 360
    month_days_remaining = (maturity_date.month - asof_date.month) * 30
    day_days_remaining = (maturity_date.day - asof_date.day)
    prds_remaining = (year_days_remaining + month_days_remaining + day_days_remaining) / days_per_period_dict[periodicity]
    whole_prds_remaining = int(prds_remaining)
    frac_prds_remaining = prds_remaining - whole_prds_remaining

    #add remaining discounted cash flows from current period to price
    price += (cpn*face_value) * (1 + yld)**-frac_prds_remaining

    #add discounted cash flows from remaining periods from period 1 to last_period-1 
    for i in range(1,whole_prds_remaining):
        price += (cpn*face_value) * (1 + yld)**-(i+frac_prds_remaining)

    #add final discounted cash flow of coupon and face value to price for last period
    price += ((cpn + 1) * face_value) * (1 + yld)**-(whole_prds_remaining + frac_prds_remaining)
    
    return price

def bulletBondTargetPriceFunction(ytm, dirty_price, coupon, periodicity, face_value, asof_date, maturity_date):

    return (bulletBondPricer(ytm, coupon, periodicity, face_value, asof_date, maturity_date) - dirty_price)**2

def bulletBondYieldExtractor(dirty_price, coupon, periodicity, face_value, asof_date, maturity_date):

    yield_sol = minimize_scalar(bulletBondTargetPriceFunction, bounds=(0.0,500), method="bounded",args=(dirty_price, coupon, periodicity, face_value, asof_date, maturity_date)).x
    
    return yield_sol
    

In [141]:
def svenssonZeroCouponYield(n_years, beta0, beta1, beta2, beta3, tau1, tau2):

    terma = n_years/tau1
    termb = n_years/tau2

    #bounds to deal with numerical errors between the 0 and 1 yr to maturity interval
    
    if abs(terma) < 1e-8:
        terma=1e-8
    if abs(termb) < 1e-8:
        termb=1e-8

    if abs(terma) > 1e-8:
        terma=1e-8
    if abs(termb) > 1e-8:
        termb=1e-8

    term1 = beta0
    term2 = beta1 * ((1-np.exp(-terma))/terma)
    term3 = beta2 * (((1-np.exp(-terma))/terma)-np.exp(-terma))
    term4 = beta3 * (((1-np.exp(-termb))/termb)-np.exp(-termb))

    return term1 + term2 + term3 + term4

In [140]:
def durationWeightedTreasuryReturn(ytm, dirty_price, coupon, periodicity, face_value, asof_date, maturity_date, b0, b1, b2, b3, t1, t2, asof_date_p, b0_p, b1_p, b2_p, b3_p, t1_p, t2_p):

    #_p next to param signifies param from previous asof_date t-1 if current asof_date is t
    
    days_per_period_dict = {
        2: 180,
        4: 90,
        6: 60,
        12: 30
    }

    #asof_date difference, using actual/actual day count basis for zero coupon treasuries
    asof_date_days_diff = (asof_date - asof_date_p).days
    if asof_date.is_leap_year:
        asof_date_yr_diff = asof_date_days_diff / 366
    else:
        asof_date_yr_diff = asof_date_days_diff / 365
    
    #periods left calculated using 30/360 day count basis based on asof_date and maturity_date
    yrs_days_left = (maturity_date.year - asof_date.year) * 360
    mth_days_left = (maturity_date.month - asof_date.month) * 30
    day_days_left = (maturity_date.day - asof_date.day)
    prds_left = (yrs_days_left + mth_days_left + day_days_left) / days_per_period_dict[periodicity]
    whole_prds_left = int(prds_left)
    frac_prds_left = prds_left - whole_prds_left

    #coupon divided by 100 to get percentage terms, then by periodicity to get per-period figure
    cpn = coupon / (periodicity * 100)
    yld = ytm / (periodicity * 100)

    #initialize weighted_return
    weighted_return = 0

    for k in range(0,whole_prds_left+1):

        prds_to_kth_cf = k + frac_prds_left

        if prds_to_kth_cf < (whole_prds_left + frac_prds_left):
            kth_cf = cpn * face_value
        else:
            kth_cf = (cpn + 1) * face_value

        temp_term = 1 / (periodicity * dirty_price)
        temp_term *= kth_cf * ((1 + yld) ** -(prds_to_kth_cf + 1))

        yrs_to_kth_cf = prds_to_kth_cf / periodicity
        prev_yrs_to_kth_cf = yrs_to_kth_cf + asof_date_yr_diff

        zc_trsy_yld = svenssonZeroCouponYield(yrs_to_kth_cf,b0,b1,b2,b3,t1,t2) / 100
        prev_zc_trsy_yld = svenssonZeroCouponYield(prev_yrs_to_kth_cf,b0_p,b1_p,b2_p,b3_p,t1_p,t2_p) / 100

        pv_zc_trsy_val = np.exp(- (yrs_to_kth_cf * zc_trsy_yld))
        prev_pv_zc_trsy_val = np.exp(- (prev_yrs_to_kth_cf * prev_zc_trsy_yld))

        if prev_pv_zc_trsy_val < 1e-8:
            prev_pv_zc_trsy_val = 1e-8
        # zero coupon treasury return
        zc_trsy_ret = (pv_zc_trsy_val / prev_pv_zc_trsy_val) - 1

        temp_term *= zc_trsy_ret

        weighted_return += temp_term

    return weighted_return

#### Loading Bullet Bond DF

In [6]:
bullet_df = pd.read_csv("bullet_monthly_df.csv")
date_cols = [col for col in bullet_df.columns if ("_dt" in col) or ("_date" in col) or (col=="maturity")]
bullet_df[date_cols] = bullet_df[date_cols].apply(pd.to_datetime)

#### Yield Extraction

In [114]:
bullet_df.loc[:,"yield"] = bullet_df.apply(
    lambda row: bulletBondYieldExtractor(row["dirty_price"],
                                         row["coupon"],
                                         2,
                                         row["principal_amt"],
                                         row["month_end_date"],
                                         row["maturity"]),
    axis=1
)

### Monthly Duration-Based Weighting Calculation

#### Merging Continuously Compounded Zero Coupon Yield Curve Params 

In [90]:
yld_curve_params_df = pd.read_csv("svensson_yield_params.csv")
yld_curve_params_lowercase_cols = [col.lower() for col in yld_curve_params_df.columns]
yld_curve_params_col_rename_map = dict(zip(yld_curve_params_df.columns,yld_curve_params_lowercase_cols))
yld_curve_params_df.rename(columns=yld_curve_params_col_rename_map,inplace=True)
yld_curve_params_df["month_end_date"] = pd.to_datetime(yld_curve_params_df["date"].apply(lambda x: f"{x.split('/')[2]}-{x.split('/')[0]}-{x.split('/')[1]}"))
yld_curve_params_df = yld_curve_params_df[["month_end_date","beta0", "beta1", "beta2", "beta3", "tau1", "tau2"]]

In [115]:
bullet_merged_df = bullet_df.merge(yld_curve_params_df,how="left", on="month_end_date")
bullet_merged_df[["beta0_t-1", "beta1_t-1", "beta2_t-1", "beta3_t-1", "tau1_t-1", "tau2_t-1"]] = bullet_merged_df[["beta0", "beta1", "beta2", "beta3", "tau1", "tau2"]].shift()
bullet_merged_df["month_end_date_t-1"] = bullet_merged_df["month_end_date"].shift()

In [116]:
import time

start = time.time()

bullet_merged_df["duration_weighted_treasury_return"] = bullet_merged_df.apply(
    lambda row: durationWeightedTreasuryReturn(row["yield"],
                                               row["dirty_price"], 
                                               row["coupon"],
                                               2,
                                               row["principal_amt"],
                                               row["month_end_date"],
                                               row["maturity"],
                                               row["beta0"],
                                               row["beta1"],
                                               row["beta2"],
                                               row["beta3"],
                                               row["tau1"],
                                               row["tau2"],
                                               row["month_end_date_t-1"],
                                               row["beta0_t-1"],
                                               row["beta1_t-1"],
                                               row["beta2_t-1"],
                                               row["beta3_t-1"],
                                               row["tau1_t-1"],
                                               row["tau2_t-1"],
                                              ),
    axis=1
)

end = time.time()
print(f"Elapsed time: {end - start:.4f} seconds")


Elapsed time: 29.1892 seconds


In [117]:
bullet_merged_df["duration_weighted_treasury_return"].describe()

count    115016.000000
mean          0.000435
std           0.013868
min          -0.329659
25%          -0.001912
50%           0.000273
75%           0.002120
max           1.187875
Name: duration_weighted_treasury_return, dtype: float64

In [136]:
len(bullet_merged_df[abs(bullet_merged_df["duration_weighted_treasury_return"]) > .1]["duration_weighted_treasury_return"]) / len(bullet_merged_df)

0.002219123884050777

In [118]:
bullet_merged_df["yield"].describe()

count    136991.000000
mean         25.747316
std          57.599073
min           0.000006
25%           4.157249
50%           5.566157
75%           7.347759
max         499.999988
Name: yield, dtype: float64

In [137]:
len(bullet_merged_df[bullet_merged_df["yield"] > 100]["yield"]) / len(bullet_merged_df)

0.1077443043703601

In [138]:
len(bullet_merged_df[(bullet_merged_df["yield"]>100)&(bullet_merged_df["defaulted"]=="Y")]) / len(bullet_merged_df[bullet_merged_df["yield"]>100])

0.0037262872628726286