In [1]:
import numpy as np
import pandas as pd
import scipy.stats as ss

`res = S * ss.norm.cdf(d1) - K * np.exp(-r * T) * (ss.norm.cdf(d2))` in the Black-Scholes Convertible Bond pricing formula calculates the price of the convertible bond.

Let's break down this expression:

1. `S * ss.norm.cdf(d1)`: This part calculates the present value of the stock holding at time 0 by multiplying the stock price `S` with the cumulative distribution function (CDF) of `d1`.

2. `K * np.exp(-r * T) * (ss.norm.cdf(d2))`: This part calculates the present value of the conversion option by multiplying the strike price `K` with the discounted value of risk-free interest rate `(np.exp(-r * T))` and the cumulative distribution function (CDF) of `d2`.

3. Subtracting the above two values gives us the total price of the convertible bond.

This formula essentially captures the two components of a convertible bond: the bond value and the option value. The Black-Scholes model is used to calculate the option value component based on the stock price, strike price, risk-free rate, time to maturity, and volatility of the underlying stock.

In the context of the Black-Scholes Model, `ss.norm.cdf(d1)` refers to the cumulative distribution function (CDF) of a standard normal distribution evaluated at the value of `d1`.

The formula to calculate `d1` in the Black-Scholes Model is:

$$ d1 = \frac{ln(S/K) + (r + \frac{1}{2} \sigma^2)T}{\sigma \sqrt{T}} $$

where:
- S is the current stock price
- K is the strike price of the option
- r is the risk-free interest rate
- sigma is the volatility of the stock price
- T is the time to maturity of the option

Once `d1` is calculated, it is used as an input in the cumulative distribution function (`cdf`) of a standard normal distribution to calculate the probability of the stock price being below a certain level.

In the context of the Black-Scholes Convertible Bond pricing formula, `ss.norm.cdf(d1)` calculates the probability that the stock price at maturity is above the conversion price, which is a key component in determining the price of the convertible bond.

$$ C = S_t N(d_1) - X e^{-rt} N(d_2) $$

where:
- $ C $ is the price of the call option
- $ S_t $ is the current price of the underlying asset
- $ X $ is the option's strike price
- $ r $ is the risk-free interest rate
- $ t $ is the time to expiration
- $ N(d_1) $ and $ N(d_2) $ are the cumulative distribution functions of the standard normal distribution, calculated as:
  - $ d_1 = \frac{\ln(S/X) + (r + \frac{1}{2}\sigma^2)t}{\sigma \sqrt{t}} $
  - $ d_2 = d_1 - \sigma \sqrt{t} $

Let me know if you need further assistance or have any more questions!

In [4]:
def NORM_CDF(d_DF: pd.DataFrame):
    # Calculate the CDF values for the whole DataFrame
    return d_DF.apply(lambda x: ss.norm.cdf(x))

In [6]:
# Create a sample DataFrame representing d1
data = {
    'd1': [0.5, 0.8, -0.3, 1.2],
    'd2': [-0.7, 1.1, 0.4, -0.9]
}
df_d = pd.DataFrame(data)
df_d

Unnamed: 0,d1,d2
0,0.5,-0.7
1,0.8,1.1
2,-0.3,0.4
3,1.2,-0.9


In [7]:
NORM_CDF(df_d)

Unnamed: 0,d1,d2
0,0.691462,0.241964
1,0.788145,0.864334
2,0.382089,0.655422
3,0.88493,0.18406


In [None]:
# Create a sample DataFrame
data = {
    'S': [100, 120, 90],
    'K': [90, 100, 95],
    'T': [1, 2, 1.5],
    'sigma': [0.2, 0.25, 0.15],
    'r': [0.05, 0.06, 0.04]
}
df = pd.DataFrame(data)

In [None]:
def Cal_option_factor(self, factor):
    def BS_CB(S, K, T, sigma, r):
        """
        Black-Scholes model
        :param S: 股价
        :param K: 转股价
        :param T: 到期时间
        :param sigma: 年化波动率
        :param r: 无风险利率
        :param type:
        :return:
        """
        # 不带赎回
        d1 = (np.log(S / float(K)) + (r + 0.5 * (sigma**2)) * T) / (
            np.sqrt(T) * float(sigma))
        d2 = d1 - sigma * np.sqrt(T)
        res = S * ss.norm.cdf(d1) - K * np.exp(-r * T) * (ss.norm.cdf(d2))
        return res

    def calculate_iv(option_price, S, K, T, r):
        """
        Implied Volatility
        计算IV 考虑转股比例
        :param option_price: 当前期权价格 （全价-债底价格）/转股比例
        :param S: 正股价格
        :param K: 转股价格
        :param T: 到期时间
        :param r: 无风险利率
        :return:
        """
        option_price = option_price * K / 100  # 除以转股比例
        option_price_est = 0  # 期权价格估计值
        top = 1  # 波动率上限
        floor = 0  # 波动率下限
        sigma = (floor + top) / 2  # 波动率初始值
        count = 0  # 计数器
        while abs(option_price - option_price_est) > 0.000001:
            option_price_est = BS_CB(S, K, T, sigma, r) * K / 100
            # 根据价格判断波动率是被低估还是高估，并对波动率做修正
            count += 1
            if (count > 500):  
            # 时间价值为0的期权是算不出隐含波动率的，因此迭代到一定次数就不再迭代了
                sigma = 0
                break

            if option_price - option_price_est > 0:  # f(x)>0
                floor = sigma
                sigma = (sigma + top) / 2
            else:
                top = sigma
                sigma = (sigma + floor) / 2
        return sigma

    def Cal_Greeks(S, K, r, T, sigma, option_type, Greeks_type):
        d1 = (np.log(S / K) + (r + sigma**2 / 2) * T) / (sigma * np.sqrt(T))
        d2 = d1 - sigma * np.sqrt(T)

        d1 = (np.log(S / K) + (r + 0.5 * sigma**2)) / ((T**0.5) * sigma)
        d2 = (np.log(S / K) + (r - 0.5 * sigma**2)) / ((T**0.5) * sigma)
        nd1, nd2 = ss.norm.pdf(d1), ss.norm.cdf(d2)
        if Greeks_type == "Gamma":
            gamma = nd1 / (S * sigma * (T**0.5))
            return gamma

        elif Greeks_type == "Theta":
            if option_type == "Call":
                theta = -0.5 * S * sigma * nd1 / (T**0.5) - r * K * np.exp(-r * T) * nd2
            else:
                raise ValueError("Invalid option type. Must be 'Call' or 'Put'.")
            return theta
        elif Greeks_type == "Delta":
            delta = ss.norm.cdf(d1)
            return delta
        else:
            raise ValueError("Invalid Greeks type. Must be 'Call' or 'Put'.")

    def gamma_theta_signal(theta, gamma, vol, stock_price, CB_opt_price):
        res = (theta + 0.5 * ((stock_price * vol) ** 2) * gamma) / CB_opt_price
        return res

    def opt_signal(delta, theta, gamma, vol, conv_value, CB_close):
        res = delta * 0.05 + theta + 0.5 * (vol**2) * gamma
        return res

    merged_df = self.get_data()
    # 计算BS期权
    merged_df["CB_opt_BS_price"] = merged_df.apply(
        lambda row: BS_CB(
            row["Stock_close"],
            row["CB_ANAL_CONVPRICE"],
            row["CB_ANAL_PTM"],
            row["annual_vol"],
            0.024,
        ),
        axis=1,
    )
    merged_df["CB_opt_BS_price"] = (
        merged_df["CB_opt_BS_price"] * merged_df["CB_ANAL_CONVRATIO"]
    )
    merged_df["CB_BS_price"] = (
        merged_df["CB_opt_BS_price"] + merged_df["CB_ANAL_STRBVALUE"]
    )
    merged_df["BS_price_premium"] = (
        merged_df["Stock_close"] - merged_df["CB_BS_price"]
    ) / (merged_df["CB_BS_price"])
    merged_df["CB_ANAL_CONVVALUE"] = merged_df["Stock_close"] * (
        100 / merged_df["CB_ANAL_CONVPRICE"]
    )  # 转股价值 CB_ANAL_CONVVALUE 指定日正股收盘价*[100/最新转股价格]

    merged_df["BS_IV"] = merged_df.apply(
        lambda row: calculate_iv(
            row["CB_optionvalue"] / row["CB_ANAL_CONVRATIO"],
            row["Stock_close"],
            row["CB_close"] / row["CB_ANAL_CONVRATIO"],
            row["CB_ANAL_PTM"],
            0.025,
        ),
        axis=1,
    )
    merged_df["BS_IV_premium"] = merged_df["BS_IV"] - merged_df["annual_vol"]
    merged_df["BS_VOL_ratio"] = merged_df["BS_IV"] / merged_df["annual_vol"]
    merged_df["BS_VOL_premium_ratio"] = (
        merged_df["BS_IV_premium"] / merged_df["annual_vol"]
    )

    merged_df["Delta"] = merged_df.apply(
        lambda row: Cal_Greeks(
            row["Stock_close"],
            row["CB_ANAL_CONVPRICE"],
            0.024,
            row["CB_ANAL_PTM"],
            row["annual_vol"],
            "Call",
            "Delta",
        ),
        axis=1,
    )
    merged_df["Gamma"] = merged_df.apply(
        lambda row: Cal_Greeks(
            row["Stock_close"],
            row["CB_ANAL_CONVPRICE"],
            0.024,
            row["CB_ANAL_PTM"],
            row["annual_vol"],
            "Call",
            "Gamma",
        ),
        axis=1,
    )
    merged_df["Theta"] = merged_df.apply(
        lambda row: Cal_Greeks(
            row["Stock_close"],
            row["CB_ANAL_CONVPRICE"],
            0.024,
            row["CB_ANAL_PTM"],
            row["annual_vol"],
            "Call",
            "Theta",
        ),
        axis=1,
    )
    merged_df.loc[merged_df["CB_optionvalue"] == 0, "CB_optionvalue"] = 0.5
    merged_df["Signal"] = merged_df.apply(
        lambda row: gamma_theta_signal(
            row["Theta"],
            row["Gamma"],
            row["annual_vol"],
            row["Stock_close"],
            row["CB_optionvalue"] / row["CB_ANAL_CONVRATIO"],
        ),
        axis=1,
    )
    merged_df["Opt_Signal"] = merged_df.apply(
        lambda row: opt_signal(
            row["Delta"],
            row["Theta"],
            row["Gamma"],
            row["annual_vol"],
            row["CB_ANAL_CONVVALUE"],
            row["CB_close"],
        ),
        axis=1,
    )
    merged_df["CB_ret"] = merged_df.groupby("s_info_windcode")["CB_close"].pct_change()
    merged_df["CB_ANAL_CONVPREMIUMRATIO"] = (
        merged_df["CB_close"] - merged_df["CB_ANAL_CONVVALUE"]
    ) / merged_df["CB_ANAL_CONVVALUE"]
    merged_df["Double_low"] = (
        merged_df["CB_ANAL_CONVPREMIUMRATIO"] * 100 + merged_df["CB_close"]
    )
    factor["Ret_dif"] = factor["CB_ret"] - factor["ret"]
    merged_df = merged_df.dropna()
    return merged_df

In [None]:
def BS_CB(S_DF, K_DF, T_DF, Sigma_DF, r: float):
    d1_DF = (np.log(S_DF / K_DF.astype(float)) + (r + 0.5 * (Sigma_DF**2)) * T_DF) / (
        np.sqrt(T_DF) * Sigma_DF.astype(float))
    d2_DF = d1_DF - Sigma_DF * np.sqrt(T_DF)
    res_DF = S_DF * NORM_CDF(d1_DF) - K_DF * np.exp(-r * T_DF) * (NORM_CDF(d2_DF))
    return res_DF

def BS_CB_LOOP(Option_price_DF, option_price_est_DF, S_DF, K_DF, T_DF, r: float):
    def Check_Value():
        pass
    count_DF = pd.Series(0, index=Option_price_DF.index, name='count')  # 计数器
    top_DF = pd.Series(1, index=Option_price_DF.index, name='top')  # 波动率上限
    floor_DF = pd.Series(0, index=Option_price_DF.index, name='floor')  # 波动率下限
    sigma_DF = (top_DF + floor_DF) / 2
    sigma_DF.name = 'sigma'
    difference_DF = abs(Option_price_DF - option_price_est_DF)
    difference_DF.name = 'difference'
    MERGE_DF = pd.concat([Option_price_DF, option_price_est_DF, 
                          S_DF, K_DF, T_DF, sigma_DF,
                          count_DF, top_DF, floor_DF,
                          difference_DF], axis=1)
    while True:
        MERGE_DF['option_price_est'] = BS_CB(S_DF, K_DF, T_DF, sigma_DF, r) * K_DF / 100
        MERGE_DF['difference'] = abs(Option_price_DF - option_price_est_DF)
    

def CALCULATE_IV(Option_price_DF, S_DF, K_DF, T_DF, r: float):
    Option_price_DF = Option_price_DF * K_DF / 100  # 除以转股比例
    # Option_Len = len(Option_price_DF)
    option_price_est_DF = pd.Series(0, index=Option_price_DF.index, 
                                    name="option_price_est")  # 期权价格估计值
    # 修改名称，方便后续使用
    Option_price_DF.name = 'Option_price'
    option_price_est_DF = 'option_price_est'
    S_DF.name = 'S'
    T_DF.name = 'T'
    BS_CB_LOOP(Option_price_DF, S_DF, K_DF, T_DF, r)

def CALCULATE_IV(Option_price_DF, S_DF, K_DF, T_DF, r: float):
    def BS_CB(S, K, T, sigma, r):
        # 不带赎回
        d1 = (np.log(S / float(K)) + (r + 0.5 * (sigma**2)) * T) / (
            np.sqrt(T) * float(sigma)
        )
        d2 = d1 - sigma * np.sqrt(T)
        res = S * ss.norm.cdf(d1) - K * np.exp(-r * T) * (ss.norm.cdf(d2))
        return res

    def Loop_Check(row):
        count = 0  # 计数器
        top = 1  # 波动率上限
        floor = 0  # 波动率下限
        sigma = (floor + top) / 2  # 波动率初始值
        Option_price = row["Option_price"]
        option_price_est = row["option_price_est"]
        S = row["S"]
        K = row["K"]
        T = row["T"]
        while Option_price - option_price_est > 0.000001:
            option_price_est = BS_CB(S, K, T, sigma, r) * K / 100
            # 根据价格判断波动率是被低估还是高估，并对波动率做修正
            count += 1
            if count > 500:
                # 时间价值为0的期权是算不出隐含波动率的，因此迭代到一定次数就不再迭代了
                sigma = 0
                break
            if Option_price - option_price_est > 0:  # f(x)>0
                floor = sigma
                sigma = (sigma + top) / 2
            else:
                top = sigma
                sigma = (sigma + floor) / 2
        return sigma

    Option_price_DF = Option_price_DF * K_DF / 100  # 除以转股比例
    # Option_Len = len(Option_price_DF)
    option_price_est_DF = pd.Series(0, index=Option_price_DF.index, 
                                    name="option_price_est")  # 期权价格估计值

    # 改变名称，方便后续使用
    Option_price_DF.name = "Option_price"
    S_DF.name = "S"
    K_DF.name = "K"
    T_DF.name = "T"

    MERGE_DF = pd.concat(
        [Option_price_DF, option_price_est_DF, 
         S_DF, K_DF, T_DF], axis=1)
    # print(MERGE_DF['Option_price'])
    return MERGE_DF.apply(Loop_Check, axis=1)

def BS_IV(MERGE_DF: pd.DataFrame):
    def BS_CB(S, K, T, sigma, r):
        """
        Black-Scholes model
        :param S: 股价
        :param K: 转股价
        :param T: 到期时间
        :param sigma: 年化波动率
        :param r: 无风险利率
        :param type:
        :return:
        """
        # 不带赎回
        d1 = (np.log(S / float(K)) + (r + 0.5 * (sigma**2)) * T) / (
            np.sqrt(T) * float(sigma))
        d2 = d1 - sigma * np.sqrt(T)
        res = S * ss.norm.cdf(d1) - K * np.exp(-r * T) * (ss.norm.cdf(d2))
        return res

    def calculate_iv(option_price, S, K, T, r):
        """
        Implied Volatility
        计算IV 考虑转股比例
        :param option_price: 当前期权价格 （全价-债底价格）/转股比例
        :param S: 正股价格
        :param K: 转股价格
        :param T: 到期时间
        :param r: 无风险利率
        :return:
        """
        option_price = option_price * K / 100  # 除以转股比例
        option_price_est = 0  # 期权价格估计值
        top = 1  # 波动率上限
        floor = 0  # 波动率下限
        sigma = (floor + top) / 2  # 波动率初始值
        count = 0  # 计数器
        while abs(option_price - option_price_est) > 0.000001:
            option_price_est = BS_CB(S, K, T, sigma, r) * K / 100
            # 根据价格判断波动率是被低估还是高估，并对波动率做修正
            count += 1
            if (count > 500):  
            # 时间价值为0的期权是算不出隐含波动率的，因此迭代到一定次数就不再迭代了
                sigma = 0
                break
            if option_price - option_price_est > 0:  # f(x)>0
                floor = sigma
                sigma = (sigma + top) / 2
            else:
                top = sigma
                sigma = (sigma + floor) / 2
        return sigma

    MERGE_DF["BS_IV"] = MERGE_DF.apply(
        lambda row: calculate_iv(
            row["CB_optionvalue"] / row["CB_ANAL_CONVRATIO"],
            row["Stock_close"],
            row["CB_close"] / row["CB_ANAL_CONVRATIO"],
            row["CB_ANAL_PTM"],
            0.025,
        ),
        axis=1,
    )
    return MERGE_DF

In [None]:
MERGE_DF = pd.DataFrame

def OPT_SIGNAL(Delta_DF, Theta_DF, Gamma_DF, Vol_DF, CB_ANAL_CON_DF, CB_CLOSE_DF):
    res_DF = Delta_DF * 0.05 + Theta_DF + 0.5 * (Vol_DF**2) * Gamma_DF
    return res_DF
MERGE_DF["Opt_Signal"] = OPT_SIGNAL(
    MERGE_DF["Delta"],
    MERGE_DF["Theta"],
    MERGE_DF["Gamma"],
    MERGE_DF["annual_vol"],
    MERGE_DF["CB_ANAL_CONVVALUE"],
    MERGE_DF["CB_close"],
)

# def OPT_SIGNAL(MERGE_DF: pd.DataFrame):
#     def opt_signal(delta, theta, gamma, vol, conv_value, CB_close):
#         res = delta * 0.05 + theta + 0.5 * (vol**2) * gamma
#         return res

#     MERGE_DF["Opt_Signal"] = MERGE_DF.apply(
#         lambda row: opt_signal(
#             row["Delta"],
#             row["Theta"],
#             row["Gamma"],
#             row["annual_vol"],
#             row["CB_ANAL_CONVVALUE"],
#             row["CB_close"],
#         ),
#         axis=1,
#     )
#     return MERGE_DF

In [None]:
def GAMMA_THETA_SIGNAL(Theta_DF, Gamma_DF, Vol_DF, Stock_Price_DF, CB_opt_price_DF):
    res_DF = (Theta_DF + 0.5 * ((Stock_Price_DF * Vol_DF) ** 2) * Gamma_DF) / CB_opt_price_DF
    return res_DF
MERGE_DF["Signal"] = GAMMA_THETA_SIGNAL(
    MERGE_DF["Theta"], 
    MERGE_DF["Gamma"], 
    MERGE_DF["annual_vol"], 
    MERGE_DF["Stock_close"],
    MERGE_DF["CB_optionvalue"] / MERGE_DF["CB_ANAL_CONVRATIO"]
)

# def GAMMA_THETA_SIGNAL(MERGE_DF: pd.DataFrame):
#     def gamma_theta_signal(theta, gamma, vol, stock_price, CB_opt_price):
#         res = (theta + 0.5 * ((stock_price * vol) ** 2) * gamma) / CB_opt_price
#         return res
    
#     MERGE_DF["Signal"] = MERGE_DF.apply(
#         lambda row: gamma_theta_signal(
#             row["Theta"],
#             row["Gamma"],
#             row["annual_vol"],
#             row["Stock_close"],
#             row["CB_optionvalue"] / row["CB_ANAL_CONVRATIO"],
#         ),
#         axis=1,
#     )
#     return MERGE_DF

In [None]:
# def Cal_Greeks(S, K, r, T, sigma, option_type, Greeks_type):
#     d1 = (np.log(S / K) + (r + sigma**2 / 2) * T) / (sigma * np.sqrt(T))
#     d2 = d1 - sigma * np.sqrt(T)

#     d1 = (np.log(S / K) + (r + 0.5 * sigma**2)) / ((T**0.5) * sigma)
#     d2 = (np.log(S / K) + (r - 0.5 * sigma**2)) / ((T**0.5) * sigma)
#     nd1, nd2 = ss.norm.pdf(d1), ss.norm.cdf(d2)
#     if Greeks_type == "Gamma":
#         gamma = nd1 / (S * sigma * (T**0.5))
#         return gamma

#     elif Greeks_type == "Theta":
#         if option_type == "Call":
#             theta = -0.5 * S * sigma * nd1 / (T**0.5) - r * K * np.exp(-r * T) * nd2
#         else:
#             raise ValueError("Invalid option type. Must be 'Call' or 'Put'.")
#         return theta
#     elif Greeks_type == "Delta":
#         delta = ss.norm.cdf(d1)
#         return delta
#     else:
#         raise ValueError("Invalid Greeks type. Must be 'Call' or 'Put'.")

def CAL_GREEKS(S_DF, K_DF, r: float, T_DF, Sigma_DF, Option_type: str, Greeks_type: str):
    d1_DF = (np.log(S_DF / K_DF) + (r + Sigma_DF**2 / 2) * T_DF) / (Sigma_DF * np.sqrt(T_DF))
    d2_DF = d1_DF - Sigma_DF * np.sqrt(T_DF)

    d1_DF = (np.log(S_DF / K_DF) + (r + 0.5 * Sigma_DF**2)) / ((T_DF**0.5) * Sigma_DF)
    d2_DF = (np.log(S_DF / K_DF) + (r - 0.5 * Sigma_DF**2)) / ((T_DF**0.5) * Sigma_DF)
    nd1_DF, nd2_DF = NORM_CDF(d1_DF), NORM_CDF(d2_DF)
    if Greeks_type == "Gamma":
        gamma_DF = nd1_DF / (S_DF * Sigma_DF * (T_DF**0.5))
        return gamma_DF

    elif Greeks_type == "Theta":
        if Option_type == "Call":
            theta_DF = -0.5 * S_DF * Sigma_DF * nd1_DF / (T_DF**0.5) - r * K_DF * np.exp(-r * T_DF) * nd2_DF
        else:
            raise ValueError("Invalid option type. Must be 'Call' or 'Put'.")
        return theta_DF
    elif Greeks_type == "Delta":
        delta_DF = NORM_CDF(d1_DF)
        return delta_DF
    else:
        raise ValueError("Invalid Greeks type. Must be 'Call' or 'Put'.")
MERGE_DF["Theta"] = CAL_GREEKS(
    MERGE_DF["Stock_close"], 
    MERGE_DF["CB_ANAL_CONVPRICE"], 
    0.024,
    MERGE_DF["CB_ANAL_PTM"],
    MERGE_DF["annual_vol"],
    "Call",
    "Theta"
)

# def GAMMA(MERGE_DF: pd.DataFrame):
#     MERGE_DF["Theta"] = MERGE_DF.apply(
#         lambda row: Cal_Greeks(
#             row["Stock_close"],
#             row["CB_ANAL_CONVPRICE"],
#             0.024,
#             row["CB_ANAL_PTM"],
#             row["annual_vol"],
#             "Call",
#             "Theta",
#         ),
#         axis=1,
#     )
#     return MERGE_DF

In [None]:
MERGE_DF["Delta"] = CAL_GREEKS(
    MERGE_DF["Stock_close"], 
    MERGE_DF["CB_ANAL_CONVPRICE"], 
    0.024,
    MERGE_DF["CB_ANAL_PTM"],
    MERGE_DF["annual_vol"],
    "Call",
    "Delta"
)

# def DELTA(MERGE_DF: pd.DataFrame):
#     MERGE_DF["Delta"] = MERGE_DF.apply(
#         lambda row: Cal_Greeks(
#             row["Stock_close"],
#             row["CB_ANAL_CONVPRICE"],
#             0.024,
#             row["CB_ANAL_PTM"],
#             row["annual_vol"],
#             "Call",
#             "Delta",
#         ),
#         axis=1,
#     )
#     return MERGE_DF