In [1]:
import math
import sys
import numpy as np
import pandas as pd
import scipy.integrate
import scipy.optimize
import scipy.stats
from pylab import plt
from mpl_toolkits.mplot3d import Axes3D
plt.style.use('ggplot')
%matplotlib inline

In [2]:
class ImpliedVol:
    
    DBL_DIG = sys.float_info.dig
    DBL_EPSILON = sys.float_info.epsilon  # smallest such that 1.0+DBL_EPSILON != 1.0
    DBL_MIN = sys.float_info.min
    SQRT_DBL_EPSILON = math.sqrt(DBL_EPSILON)
    FOURTH_ROOT_DBL_EPSILON = math.sqrt(SQRT_DBL_EPSILON)
    ONE_DIV_SQRT_TWO_PI = 0.3989422804014326779399460599343818684758586311649
    
    def __init__(self, input_premium, forward, displacement, strike, expiry_time, call_put):
        self.input_premium = input_premium
        self.forward = forward
        self.displacement = displacement
        self.strike = strike
        self.expiry_time = expiry_time
        self.call_put = call_put
        
    def get_sigma_high(self, beta, b_c, b_max, phi_c):
        """
        The 'high' initial guess vol function
        :param beta: float
        :param b_c: float
        :param b_max: float
        :param phi_c: float
        :return: float
        """
        return -2.0 * scipy.stats.norm.ppf((b_max - beta) / (b_max - b_c) * phi_c)

    def get_sigma_low(self, beta, ln_b_c, two_x_square, ax):
        """
        The 'low' initial guess vol function
        :param beta: float
        :param ln_b_c: float
        :param two_x_square: float
        :param ax: float
        :return: float
        """
        return math.sqrt(two_x_square / (ax - 4.0 * (math.log(beta) - ln_b_c)))
    
    def cube(x):
        """
        Little helper function, calculates x * x * x
        :param x: float
        :return: float
        """
        return x * x * x

    def normalised_black_call(self, x, sigma):
        """
        The normalized Black call formula is the one where
        1. the argument x = ln(forward / strike)
        2. its value is b(.) = [black formula] / sqrt(forward * strike)
        :param x: float
        :param sigma: float
        :return: float
        """
        x2 = x * x
        s2 = sigma * sigma
        b_max = math.exp(0.5 * x)
        one_over_b_max = 1.0 / b_max
        if x2 < self.DBL_EPSILON * s2 or (x2 + s2) < self.DBL_EPSILON:
            if s2 * s2 > self.DBL_EPSILON:
                b0 = 1.0 - 2.0 * scipy.stats.norm.cdf(-0.5 * sigma)
            else:
                b0 = self.ONE_DIV_SQRT_TWO_PI * sigma * (
                    1.0 - s2 * (1.0 / 24.0 - s2 * (1.0 / 640.0 - s2 * (1.0 / 21504.0 - s2 / 884736.0)))
                )
            return max(b0 + 0.5 * x, 0.0)
        xi = x / sigma
        if s2 < self.DBL_EPSILON * x2:
            phi0 = math.exp(-0.5 * xi * xi) * self.ONE_DIV_SQRT_TWO_PI
            xi2 = xi * xi
            return max(
                phi0 * math.exp(-0.125 * s2) * 4.0 * sigma / cube(4.0 * xi2 - s2) *
                (8.0 * xi2 * (2.0 * xi2 - s2 - 6.0) + s2 * (s2 - 4.0)),
                0.0,
            )
        return max(
            scipy.stats.norm.cdf(xi + 0.5 * sigma) * b_max - scipy.stats.norm.cdf(xi - 0.5 * sigma) * one_over_b_max,
            0.0
        )

    def intrinsic_value(self, forward, strike, option_type):
        """
        Intrinsic value of the option
        :param forward: float
        :param strike: float
        :param option_type: OptionType
        :return: float
        """
        return max(float(option_type) * (forward - strike), 0.0)

    def implied_volatility(self):
        """
        The Black implied volatility function
        :param self.input_premium: float
        :param forward: float
        :param strike: float
        :param expiry_time: float
        :param call_put: str. 'call' or 'put'
        :return: float
        """
        disp_forward = self.forward + self.displacement
        if self.call_put == 'call':
            option_type = 1
        else:
            option_type = -1
        disp_strike = self.strike + self.displacement
        intrinsic = self.intrinsic_value(disp_forward, disp_strike, option_type)
        if self.input_premium == intrinsic:
            return 0.0
        if intrinsic >= self.input_premium:
            raise ValueError(
                "Black implied_volatility: cannot calculate implied volatility if specified premium is less than the "
                "minimum possible price. In this case, the given premium was " + str(self.input_premium) + " and the "
                "intrinsic value is " + str(intrinsic)
            )
        input_premium_upper_bound = disp_strike if option_type == -1 else disp_forward 
        if self.input_premium >= input_premium_upper_bound:
            raise ValueError(
                "Black implied volatility: the given premium is higher than the option price for infinite implied "
                "volatility and thus there is no implied volatility that matches this price."
            )

        premium = self.input_premium - intrinsic
        # Step 2
        ax = abs(math.log(disp_forward/disp_strike))
        x = -ax
        beta = premium / math.sqrt(disp_forward * disp_strike)
        ln_beta = math.log(beta)
        b_max = math.exp(0.5 * x)
        # 2.10
        sigma_c = math.sqrt(2.0 * ax)
        # 3.2
        b_c = self.normalised_black_call(x, sigma_c)
        phi_c = scipy.stats.norm.cdf(-0.5 * sigma_c)
        # 
        sigma_high = self.get_sigma_high(beta, b_c, b_max, phi_c)
        relative_accuracy = self.SQRT_DBL_EPSILON * self.FOURTH_ROOT_DBL_EPSILON
        sigma = sigma_high
        previous_sigma = sys.float_info.max
        beta_less_than_b_c = beta < b_c
        if beta_less_than_b_c:
            sigma_star = self.get_sigma_high(0, b_c, b_max, phi_c)
            b_star = self.normalised_black_call(x, sigma_star)
            x2 = x * x
            two_x_square = 2.0 * x2
            ln_b_c = math.log(b_c)
            sigma_low_star = self.get_sigma_low(b_star, ln_b_c, two_x_square, ax)
            sigma_high_star = self.get_sigma_high(b_star, b_c, b_max, phi_c)
            gamma = float(
                math.log((sigma_star - sigma_low_star) / (sigma_high_star - sigma_low_star)) / math.log(b_star / b_c)
            )
            xi = beta / b_c
            w = pow(xi, gamma)
            sigma *= w
            sigma += (1.0 - w) * self.get_sigma_low(beta, ln_b_c, two_x_square, ax)
            if self.normalised_black_call(x, sigma) < self.DBL_MIN:
                sigma += sigma_star
                sigma *= 0.5
                if self.normalised_black_call(x, sigma) < self.DBL_MIN:
                    sigma += sigma_c
                    sigma *= 0.5
        i = 0
        while i < ((self.DBL_DIG * 10) / 3) and abs(sigma / previous_sigma - 1.0) > relative_accuracy:
            i += 1
            previous_sigma = sigma
            half_sigma = 0.5 * sigma
            xoversigma = x / sigma
            xoversigma2 = xoversigma * xoversigma
            b = self.normalised_black_call(x, sigma)
            vega = self.ONE_DIV_SQRT_TWO_PI * math.exp(-0.5*(xoversigma2+half_sigma*half_sigma))
            vannaovervega = xoversigma2 / sigma - 0.25 * sigma
            if beta_less_than_b_c:
                ln_b = math.log(b)
                nu = max((ln_beta - ln_b) * ln_b / ln_beta * b / vega, -half_sigma)
                eta = max(0.5 * nu * (vannaovervega - (2.0 / ln_b + 1.0) * vega / b), -0.75)
            else:
                nu = max((beta - b) / vega, -half_sigma)
                eta = max(0.5 * nu * vannaovervega, -0.75)
            sigma += max(nu / (1.0 + eta), -half_sigma)
        # The only situations observed thus far when the above iteration doesn't converge within (DBL_DIG*10)/3 == 50
        # iterations are when
        # a) the input option price is non-zero but so small that it is stored as a so-called denormalised floating
        #    point number which means that there aren't enough digits of significance in it to be able to converge to
        #    three quarters of machine accuracy (relative) in sigma
        # b) the input option price is so close to its theoretical maximum such that b_max-beta doesn't contain enough
        #    digits of significance to be able to converge to three quarters of machine accuracy (relative) in sigma
        # In those cases, returning whateve was achieved thus far is an acceptable course of action.
        return sigma / math.sqrt(self.expiry_time)


In [3]:
ImpliedVol(0.00803496156156, 0.0140801913058, 0.007, 0.0140801913058, 30.0, 'put').implied_volatility()

0.18166038430851289

In [4]:
test = ImpliedVol(0.00803496156156, 0.0140801913058, 0.007, 0.0140801913058, 30.0, 'put')

In [9]:
class CashPhy:
    def __init__(self, vol_atm_pay, vol_atm_rec, expiry_time, displacement, forward, strike, pvbp_cash,
                 vol_phy, number_payments):
        self.vol_atm_pay = vol_atm_pay
        self.vol_atm_rec = vol_atm_rec
        self.expiry_time = expiry_time
        self.displacement = displacement
        self.forward = forward
        self.strike = strike
        self.pvbp_cash = pvbp_cash
        self.vol_phy = vol_phy
        self.number_payments = number_payments
        #self.payoff_func = max(float(option_type) * (forward_phy - self.strike), 0.0)
        self.vol_atm_cash, self.displacement_star = self.calibrate()
        
    def get_diffused_payoff(self, z, option_type, cash_settled_displacement, cash_settled_vol):
        exp_arg = self.vol_phy * math.sqrt(self.expiry_time) * z - 0.5 * self.vol_phy * self.vol_phy * self.expiry_time
        exp_arg = min(exp_arg, math.log(sys.float_info.max))
        exp = math.exp(exp_arg)
        forward_phy = (self.forward + self.displacement) * exp - self.displacement
        annuity = 0.0
        tmp = 1.0
        for j in range(0, self.number_payments):
            tmp /= 1.0 + forward_phy
            annuity += tmp
        exp_arg_cash_settled = cash_settled_vol * math.sqrt(self.expiry_time) * z - 0.5 * cash_settled_vol * cash_settled_vol * self.expiry_time
        exp_arg_cash_settled = min(exp_arg_cash_settled, math.log(sys.float_info.max))
        exp_cash_settled = math.exp(exp_arg_cash_settled)
        forward_cash_settled = (self.forward + cash_settled_displacement) * exp_cash_settled - cash_settled_displacement
        numeraire = 0.0
        tmp = 1.0
        for j in range(0, self.number_payments):
            tmp /= 1.0 + forward_cash_settled
            numeraire += tmp
        payoff = self.pvbp_cash * payoff_func * annuity / numeraire
        return payoff
    
    def get_unit_payoff(self, z, cash_settled_displacement, cash_settled_vol):
        self.payoff_func = 1
        self.get_diffused_payoff(self, z, cash_settled_displacement, cash_settled_vol)
        
    def get_payoff(self, z, option_type, cash_settled_displacement, cash_settled_vol):
        self.payoff_func = max(float(option_type) * (forward_phy - self.strike), 0.0)
        self.get_diffused_payoff(self, z, cash_settled_displacement, cash_settled_vol)
    
    def calibrate(self):
        """
        Solve for cash-settled vol & displacement such that the payer & receiver ATM cash settled vols are matched
        :return: pair (tuple) of cash-settled ATM vol and cash-settled displacement
        """
        calibration_result = scipy.optimize.root(
            self.get_zero_width_collar_error,
            np.array([self.vol_phy, self.displacement])
                  )
        cash_settled_vol_atm, cash_settled_displacement = calibration_result.x
        return cash_settled_vol_atm, cash_settled_displacement
    
    def get_zero_width_collar_error(self, cash_settled_vol_and_displacement):
        """
        Get zero-width collar error: pair of difference (model versus target) between payer & receiver cash settled vols
        :param cash_settled_vol_and_displacement: numpy array with two entries, the cash-settled vol & displacement
        :return: numpy array
        """
        target_vols = [('call', self.vol_atm_pay), ('put', self.vol_atm_rec)]
        cash_settled_vol, cash_settled_displacement = cash_settled_vol_and_displacement
        vol_diffs = []
        for option_type, target_vol in target_vols:
            implied_vol_cash = self.get_implied_vol(option_type, cash_settled_displacement, cash_settled_vol)
            vol_diffs.append(implied_vol_cash - target_vol)
        return np.array(vol_diffs)
    
    def get_implied_vol(self, option_type, cash_settled_displacement, cash_settled_vol):
                    
        """
        Integrate the model and imply the vol
        :param option_type: OptionType
        :return: float
        """
        model_value = self.integrate(self.get_payoff_cash, option_type, cash_settled_displacement, cash_settled_vol)
        model_value_adj = model_value / self.pvbp_cash
        result = ImpliedVol(model_value_adj, self.forward, self.displacement, self.strike, self.expiry_time,
                                    option_type).implied_volatility()
        return result
    
    def integrate(self, payoff_function, *args, **kwargs):
        """
        Integrate the function f over the kernel
        :param f: function
        :param args: list. Additional arguments for function f
        :param kwargs: dict. Key word arguments
        :return: tuple where the first entry represents the resulting integral value
        """
        results = scipy.integrate.quad(self.kernel, -np.inf, np.inf, (payoff_function, *args), **kwargs)
        return results[0]
    
    def kernel(self, z, f, *args):
        """
        The kernel function
        :param z: float. Gaussian deviate
        :param f: function to integrate
        :param args: list. Additional arguments for function f
        :return: float
        """
        return 1.0 / ((2.0 * math.pi) ** 0.5) * f(z, *args) * math.exp(-0.5 * z ** 2)
    


DF                0.921604
Fwd              0.0206367
Fwd*             0.0207853
Shift                 0.01
Shift*           0.0182102
PVBP               8.30754
PVBP*              8.24808
Ann Cash           8.25088
ATM Phy           0.204573
ATM Cash*          0.17587
ATM Cash Rec      0.202329
ATM Cash Pay      0.206379
ATM PCC         0.00122573

self.vol_atm_pay = vol_atm_pay
        self.vol_atm_rec = vol_atm_rec
        self.expiry_time = expiry_time
        self.displacement = displacement
        self.forward = forward
        self.strike = strike
        self.pvbp_cash = pvbp_cash
        self.vol_phy = vol_phy
        self.number_payments = number_payments

In [10]:
a = CashPhy(0.206379, 0.202329, 10, 0.01, 0.0206367, 0.0206367, 8.25088, 0.204573, 10)

In [8]:
a.calibrate()

(0.17586420143422221, 0.018211920543946675)

In [11]:
a.displacement_star

0.018211920543946675