# Solvency 2 - Rates
_Author: Adrian Haerle_ <br>
The below script approximates the extrapolated interest rate curves described by EIOPA.

In [2]:
import numpy as np
import pandas as pd
from math import exp, log
from scipy.optimize import minimize
from scipy.interpolate import interp1d

## Helper functions
The helper functions are simplified transformations of rates. 

In [None]:
def dcf_to_zero(dcf):
    """
    Helper function transforms sorted discount factors to zero rates
    """
    num_dcf = len(dcf)
    zero_rates = num_dcf * [0]
    
    #Loop throguh remaining dcf
    for index, dcf_ in enumerate(dcf):
        
        #Time to maturity
        time_to_maturity = index + 1 
        
        #Zero rates
        zero_rates[index] = (1./dcf_)**(1/time_to_maturity) - 1
    
    #Return
    return zero_rates

In [None]:
def zero_to_dcf(zero_rates):
    """
    Helper function transforms sorted zero rates to discount factors
    """
    num_rates = len(zero_rates)
    dcf = num_rates * [0]
    
    #Loop throguh remaining rates
    for index, rate in enumerate(zero_rates):
        #Time to maturity
        time_to_maturity = index + 1 
        
        #Discount factors
        dcf[index] = 1 / (1+rate)**(time_to_maturity)

    return dcf

In [None]:
def swap_to_dcf(swap_rates):
    """
    Helper function transforms sorted swap rates to discount factors
    """
    num_rates = len(swap_rates)
    dcf = num_rates * [0]
    
    #Loop throguh remaining dcf
    for index, rate in enumerate(swap_rates):
        if index == 0:
            dcf[index] = 1./(1.+rate)
        else:
            dcf[index] = (1-(rate*sum(dcf[0:index+1]))) / (1.+rate)

    return dcf

In [None]:
def dcf_to_swap(dcf):
    """
    Helper function transforms sorted discount factors to swap rates
    """
    num_dcf = len(dcf)
    swap_rates = num_dcf * [0]
    
    #Loop throguh remaining dcf
    for index, dcf_ in enumerate(dcf):
        if index == 0:
            swap_rates[index] = (1/dcf_) - 1
        else:
            swap_rates[index] = (1-dcf_) / sum(dcf[0:index+1])

    return swap_rates

In [None]:
def swap_to_zero(swap_rates):
    """
    Helper function transforms sorted swap rates to zero rates
    """
    swap_rates = list(swap_rates)
    dcf = swap_to_dcf(swap_rates)
    zero_rates = dcf_to_zero(dcf)
    return zero_rates

In [None]:
def zero_to_swap(zero_rates):
    """
    Helper function transforms sorted zero rates to swap rates
    """
    zero_rates = list(zero_rates)
    dcf = zero_to_dcf(zero_rates)
    swap_rates = dcf_to_swap(dcf)
    return swap_rates

# Smith-Wilson Extrapolation

In [1]:
class smith_wilson:
    
    def __init__(self, input_rates, input_liquid, ufr, alpha=None, min_convergence_year=60, rate_type="zero"):
        """
        Assumptions
        --------------
        -CRA: subtracted before rates are inserted
        -VA: added before rates are inserted
        -n_coupons: currently only annual coupon paying instruments are supported!
        -zero_coupon rates are all assumed to be liquid up to LLP 
        -input_rates and input_liquid should match in length
        
        Inputs
        --------------
        input_rates: array
            Interest rates where position matches maturity in years (e.g. index position 0 is 1y, index position 1 is 2y...)
        input_liquid: array
            Boolean indicator whether respective rate is considered liquid
        ufr: float
            Ultimate forward rate for extrapolation
        alpha: float
            speed of convergence
        min_convergence_year: int
            convergence_point = max(min_convergence_year, last_liquid_point + 40)
        rate_type: string
            Either "zero" or "swap" depending on rate input
        """
        
        #Input checks
        assert len(input_rates) == len(input_liquid)
        assert rate_type in ["zero", "swap"]
        
        self.ufr = ufr
        self.lnUfr = log(1+ufr)
        self.rate_type = rate_type
        self.input_rates = input_rates
        self.input_liquid = input_liquid
        self.last_liquid_point = np.max(np.where(self.input_liquid==1)) + 1
        self.liquid_rates = self.input_rates[np.where(self.input_liquid==1)]
        self.liquid_maturities = np.where(self.input_liquid==1)[0] + 1
        self.convergence_point = max(min_convergence_year, self.last_liquid_point + 40)
        self.tau = 0.0001 #1bps
        
        #Dimensions of matrices
        self.M = int(self.last_liquid_point)
        self.N = len(self.liquid_maturities)
        
        #If no alpha is provided, it is optimized
        if alpha is not None:
            self.alpha = np.array(alpha)
        else:
            self.optimize_alpha()
            
        self.zero_curve = self.zero_curve(start=1, end=120)
        self.forward_curve = self.zero_curve(start=1, end=120)
        
    def X_matrix(self):
        """
        N x M matrix containing cash flows of N observable instruments and M payment dates.
        For zero rates where it is assumed that all rates up to the LLP are liquid this simplifies to N x N matrix
        """
        if self.rate_type == "zero":
            X = np.diag(np.repeat(1, self.N))
        else:
            X = np.zeros([self.N, self.M])
            for n in range(0, self.N):
                maturity_in_years = self.liquid_maturities[n]
                rate = self.liquid_rates[n]
                for m in range(0, maturity_in_years - 1):
                    X[n, m] = rate
                X[n, maturity_in_years - 1] = 1.0 + rate
        return X
        
    def t(self, j):
        """
        Generic function returns time
        """
        return j
        
    def t_vector(self):
        """
        M x 1 column vector containing maturities in years
        """
        return np.array([self.t(j) for j in range(1, self.M + 1)])
    
    def t_observed(self, n):
        """
        Returns maturity in years for n-th liquid instrument (1 to N)
        """
        return self.liquid_maturities[n-1]
    
    def t_vector_observed(self):
        """
        N x 1 column vector with observed maturities of liquid instruments in years
        """
        return np.array([self.t_observed(j) for j in range(1, self.N + 1)])
    
    def r_observed(self, n):
        """
        Returns interest rate for n-th liquid instrument (1 to N)
        """
        return self.liquid_rates[n-1]
        
    def wilson_function(self, i, j):
        """
        Wilson function -- essential for extrapolation
        """
        t = self.t(i)
        uj = self.t(j)
        return exp(-self.lnUfr * (t + uj)) * (self.alpha * min(t, uj) - 0.5 * exp(-self.alpha * max(t, uj)) * (exp(self.alpha * min(t, uj)) - exp(-self.alpha * min(t, uj))))
        
    def W_matrix(self):
        """
        M x M matrix
        """
        W_matrix = np.array([[self.wilson_function(i, j) for j in range(1, self.M + 1)] for i in range(1, self.M + 1)])
        
        #Optimization leads to strange shape -- need for further investigation
        return W_matrix.reshape(self.M, self.M)
    
    def mu(self, i):
        """
        Ultimate forward rate discount factors
        """
        return exp(-self.lnUfr * self.t(i))
    
    def mu_vector(self):
        """
        M x 1 column vector with ufrs discount factors
        """
        return np.array([self.mu(i) for i in range(1, self.M + 1)])
    
    def Q_matrix(self):
        """
        M x N aux matrix
        """
        return np.matmul(np.diag(self.mu_vector()), self.X_matrix().T)
    
    def m(self, n):
        """
        Observed zero coupon / swap prices
        """
        if self.rate_type == "zero":
            return ((1 + self.r_observed(n)) ** (-self.t_observed(n)))
        else:
            #Par swap rates by construction have a value of 1.0 units
            return 1.0
        
    def m_vector(self):
        """
        N x 1 column vector with bond / swap prices
        """
        return np.array([self.m(i) for i in range(1, self.N + 1)])
    
    def zeta_vector(self):
        """
        N x 1 column vector with zeta
        """
        left_matrix = np.linalg.inv(np.matmul(self.X_matrix(), np.matmul(self.W_matrix(), self.X_matrix().T)))
        right_matrix = self.m_vector() - np.matmul(self.X_matrix(), self.mu_vector())
        return np.matmul(left_matrix, right_matrix)
    
    def zeta(self, i):
        """
        Zeta for asset
        """
        return self.zeta_vector()[i-1]
    
    def quasi_constant_k(self):
        Q = self.Q_matrix()
        zeta_vec = self.zeta_vector()
        t_vec = self.t_vector()
        alpha = self.alpha
        return (1+alpha * np.matmul(t_vec.T, np.matmul(Q, zeta_vec))) / (np.matmul(np.sinh(alpha*t_vec.T), np.matmul(Q, zeta_vec)))
    
    def g_alpha(self):
        return self.alpha / abs(1 - self.quasi_constant_k() * exp(self.alpha * self.convergence_point))
    
    def error_function(self, alpha):
        """
        Minimize alpha given two constraints:
        1) alpha >= 0.05
        2) g_alpha() <= tau (certain precision)
        """
        self.alpha = np.array(alpha)
        return alpha
    
    def constraint_precision(self, alpha):
        """
        g_alpha() <= tau (certain precision)
        """
        self.alpha = np.array(alpha)
        return self.tau - self.g_alpha()
    
    def optimize_alpha(self):
        """
        Minimize alpha given two constraints:
        1) alpha >= 0.05
        2) g_alpha() <= tau (certain precision)
        """
        
        bounds = [(0.05, None)]
        constraints = ({"type": "ineq", "fun": self.constraint_precision}) #>=0
        
        optimize_result = minimize(fun=self.error_function,
                                  x0=np.array(0.2),
                                  method="SLSQP",
                                  constraints=constraints,
                                  bounds=bounds,
                                  tol=0.0001,
                                  options={"disp":False})
        optimized_alpha = optimize_result.x
        self.alpha = optimized_alpha
        
    def cash_flow(self, i, j):
        """
        Returns cash flow of i-th asset (1 to N) and j-th payment date (1 to M) from X_matrix
        """
        X = self.X_matrix()
        return X[i-1, j-1]
    
    def pricing(self, t):
        """
        Pricing function for zero coupon bond with maturity t in years
        """
        if self.rate_type == "zero":
            return self.mu(t) + sum(self.zeta(j) * self.wilson_function(t, j) for j in range(1, self.N +1))
        else:
            total_sum = 0.0
            for i in range(1, self.N + 1):
                m_sum = 0.0
                zeta_sum = self.zeta(i)
                for j in range(1, self.M + 1):
                    m_sum = m_sum + self.cash_flow(i, j) * self.wilson_function(t, self.t(j))
                total_sum = total_sum + zeta_sum * m_sum
            return total_sum + self.mu(t)
    
    def zero_rate(self, t):
        """
        Returns extrapolated zero rate for maturity t in years
        """
        return (1 / self.pricing(t)) ** (1 / self.t(t)) - 1
    
    def zero_curve(self, start=1, end=120):
        """
        Returns extrapolated zero curve
        """
        return np.array([self.zero_rate(year) for year in range(start, end + 1)])
    
    def forward_rate(self, t):
        """
        Returns extrapolated forward rate for maturity t in years
        """
        return self.pricing(t-1) / self.pricing(t) - 1
    
    def forward_curve(self, start=1, end=120):
        """
        Returns extrapolated forward curve
        """
        return np.array([self.forward_rate(year) for year in range(start, end + 1)])

In [None]:
def sm_extrapolation(input_rates, input_liquid, ufr, alpha=None, min_convergence_year=60, rate_type="swap", va=0):
    """
    ATTENTION - misuse of object oriented approch to create quick and dirty wrapper function
    
    """
    #Create basic risk free term structure
    basic_curve_object = smith_wilson(input_rates, input_liquid, ufr=ufr, alpha=alpha, min_convergence_year=min_convergence_year, rate_type=rate_type)
    extrapolated_zero_curve = basic_curve_object.zero_curve().flatten()
    
    #Add va if required
    if va != 0:
        llp = basic_curve_object.last_liquid_point
        spot_input_for_va_curve = extrapolated_zero_curve + va/10000.0
        liquid_rates_va = np.repeat(0,len(spot_input_for_va_curve))
        liquid_rates_va[0:llp] = 1.0
        
        #Create rate object with zero rates input adjusted by va
        basic_curve_object_va = smith_wilson(spot_input_for_va_curve, liquid_rates_va, ufr=ufr, alpha=alpha, min_convergence_year=min_convergence_year, rate_type="zero")

        #Get extrapolated s2 curves with va adjustment
        extrapolated_zero_curve = basic_curve_object_va.zero_curve().flatten()
        
    return extrapolated_zero_curve

# Alternative extrapolation method

In [None]:
def compute_fwd_interpolation(fwd, swap_rate_end, zero_rates_known, start_tenor, end_tenor):
    """
    Interpolates forward rates as described by EIOPA in 
    "Background Document On The Optinion On The 2020 Revicew Of Solvency II" p.787f
    """
    left_side_1 = sum([1. / ((1+rate)**(t+1)) for t, rate in enumerate(zero_rates_known)])
    left_side_2 = sum([1. / ((1+fwd)**t) for t in range(1, end_tenor - start_tenor + 1)])
    left_side = left_side_1 + (1. / (1+zero_rates_known[-1])**start_tenor) * left_side_2
    right_side = 1. / ((1+zero_rates_known[start_tenor-1])**(start_tenor) * (1+fwd)**(end_tenor - start_tenor)) 
    return swap_rate_end * left_side + right_side

In [None]:
def error_fwd_interpolation(fwd, args):
    """
    Error function for numeric procedure required to interpolate between observed liquid market rates
    """
    swap_rate_end = args[0]
    zero_rates_known = args[1]
    start_tenor = args[2]
    end_tenor = args[3]     
    res = compute_fwd_interpolation(fwd, swap_rate_end, zero_rates_known, start_tenor, end_tenor)
    return np.abs(res-1)

In [None]:
def interpolate_fwd(fwd, swap_rate_end, zero_rates_known, start_tenor, end_tenor):
    """
    Interpolates forward rates according to methodology described in "EIOPA - Background Analysis p. 788"
    
    Parameters
    ----------
    fwd: 1 dimensional Ndarray
        fwd to be calculated
    swap_rate_end: DataFrame
        final swap rate. E.g. for 15y --> 20y period this would be 20y swap rate
    zero_rates_known: 1 dimensional Ndarray
        Zero rates known so far
    start_tenor: float
        last known tenor
    end_tenor: float
        final tenor (end of interpolation)
    
    Returns
    -------
    final_fwd: 1 dimensional Ndarray
        fwd that can be used for calculation of all zero rates in between the observed swap rates
    """
    #Optimization tolerance
    TOLERANCE = 1e-10
            
    #Number of assets
    number_of_fwds = len(fwd)
            
    #Long only weights to be assigned
    bound = (-1.0, 1.0)
    bounds = tuple(bound for asset in range(number_of_fwds))
    
    # Optimisation (minimize error)
    optimize_result = minimize(fun=error_fwd_interpolation,
                               x0=fwd,
                               args=[swap_rate_end, zero_rates_known, start_tenor, end_tenor],
                               method='SLSQP',
                               bounds=bounds,
                               tol=TOLERANCE,
                               options={'disp': False})

    # Recover the weights from the optimised object
    final_fwd = optimize_result.x

    return final_fwd

In [None]:
def compute_interpolated_zero(swap_rates_market, liquid_maturities):
    """
    Based on swap rates with maturity of 1-12, 15, 20, 25, 30, 40 and 50 years this function builds the forwards and zero rates.
    Methodology as described by EIOPA in "Background Document On The Optinion On The 2020 Revicew Of Solvency II" p.787f.
    
    Parameters
    ----------
    swap_rates_market: 1 dimensional Ndarray
        market rates where index position indicates maturity (e.g. [4] corresponds to a maturity of 5y)
    liquid_maturities: 1 dimensional Ndarray
        maturities of the respective interest rates
    
    Returns
    -------
    zero_rates_market_interpolated: list
        contains interpolated zero curve based on liquid swap rates
    """
    
    #Creates dummy to start interpolation
    fwd_dummy = np.array([0.01])
    
    #Number of liquid maturities
    N = len(liquid_maturities)
    
    #Construct zero rates
    zero_rates_market = np.array(pyData.swap_to_zero(swap_rates_market))
    
    #Starting point of zero curve with interpolation
    zero_rates_market_interpolated = [zero_rates_market[0]]
    
    #Loop through each liquid rate pair and see whether interpolation is required
    for liquid_idx in range(1, N):
        
        t1 = liquid_maturities[liquid_idx]
        t2 = liquid_maturities[liquid_idx-1]
        t = t1 - t2
        
        if t > 1:

            fwd = interpolate_fwd(fwd=fwd_dummy, swap_rate_end=swap_rates_market[t1-1], zero_rates_known=np.array(zero_rates_market_interpolated), start_tenor=t2, end_tenor=t1)
            z = (1+zero_rates_market_interpolated[-1])**t2
                    
            for zero_runs in range(1, t+1):
                z_temp = (z * (1+fwd)**zero_runs)**(1./(t2+zero_runs))
                zero_rates_market_interpolated.append(z_temp[0]-1.)
                
        else:
            #Assuming this can only happen for short maturities in the beginning
            zero_rates_market_interpolated.append(zero_rates_market[t1-1])
                
    return zero_rates_market_interpolated

In [None]:
def extract_fwds(zero_rates_market_interpolated, liquid_maturities, fsp=20):
    """
    Extracts main (interpolated) forwards.
    Methodology as described by EIOPA in "Background Document On The Optinion On The 2020 Revicew Of Solvency II" p.787ff.
    
    Parameters
    ----------
    zero_rates_market_interpolated: 1 dimensional Ndarray
        interpolated zero rates where index position indicates maturity (e.g. [4] corresponds to a maturity of 5y)
    liquid_maturities: 1 dimensional Ndarray
        maturities of the respective interest rates
    fsp: float
        maturity in years of first smoothing point 
    
    Returns
    -------
    forwards_pre_fsp: list
        forward rates based on main liquid interpolated rates before fsp
    forwards_llfr: list
        forward rates based on main liquid interpolated rates required for llfr calculation
    """
    
    #Number of liquid maturities
    N = len(liquid_maturities)
    
    #Init
    forwards_pre_fsp = [zero_rates_market_interpolated[0]]
    forwards_llfr = []
    
    #Loop through each liquid rate pair and calculate required fwds
    for liquid_idx in range(1, N):
        
        t1 = liquid_maturities[liquid_idx]
        t2 = liquid_maturities[liquid_idx-1]
        t = t1 - t2
        
        if t1 <= fsp:
            fwd = ((1+zero_rates_market_interpolated[t1-1])**t1 / (1+zero_rates_market_interpolated[t2-1])**t2)**(1/t) - 1    
            forwards_pre_fsp.append(fwd)
        
        if t1 == fsp:
            fwd = ((1+zero_rates_market_interpolated[t1-1])**t1 / (1+zero_rates_market_interpolated[t2-1])**t2)**(1/t) - 1    
            forwards_llfr.append(fwd)
        
        if t1 > fsp:
            t = t1 - fsp
            fwd = ((1+zero_rates_market_interpolated[t1-1])**t1 / (1+zero_rates_market_interpolated[fsp-1])**fsp)**(1/t) - 1    
            forwards_llfr.append(fwd)
            
    return forwards_pre_fsp, forwards_llfr

In [None]:
def compute_llfr(fwd, volume=np.array([3.3, 1.45 , 6, 0.3, 0.4]), va=0.0):
    """
    Calculates last liquid forward rate (llfr) as described by EIOPA in 
    "Background Document On The Optinion On The 2020 Revicew Of Solvency II" p.788f

    volume[0] = 20y
    volume[1] = 25y
    volume[2] = 30y
    volume[3] = 40y
    volume[4] = 50y
    fwd[0] = 15y-20y --> 15y5y + VA
    fwd[1] = 20y-25y --> 20y5y
    fwd[2] = 20y-30y --> 20y10y
    fwd[3] = 20y-40y --> 20y20y
    fwd[4] = 20y-50y --> 20y30y
    """
    weight = volume / volume.sum()
    fwds_incl_va = fwd.copy()
    fwds_incl_va[0] = fwds_incl_va[0] + va / 10000.0
    llfr = fwds_incl_va * weight
    return np.array([llfr.sum()])

In [None]:
def compute_curve_with_va(forwards_pre_fsp, liquid_maturities, fsp=20, va=0):
    """
    Constructs zero curve from forwards until FPS  - potentially including VA up .
    Methodology as described by EIOPA in "Background Document On The Optinion On The 2020 Revicew Of Solvency II" p.788f.
    
    Parameters
    ----------
    forwards_pre_fsp: 1 dimensional Ndarray
        forwards between liquid rates
    liquid_maturities: 1 dimensional Ndarray
        maturities of the respective interest rates
    fsp: float
        maturity in years of first smoothing point 
    va: float
        volatility adjustment
    
    Returns
    -------
    zero_rates: list
        contains interpolated zero curve up to a maturity of 20y including va
    """
    
    #Number of liquid maturities smaller than or equal to fsp
    N = len(liquid_maturities[liquid_maturities<=fsp])
    
    #Input check
    assert N == len(forwards_pre_fsp)
    
    #Add va to all forwards
    forwards_pre_fsp_incl_va = forwards_pre_fsp.copy() 
    forwards_pre_fsp_incl_va = [fwd_rate + va / 10000.0 for fwd_rate in forwards_pre_fsp_incl_va]
    
    #Init
    zero_rates = [forwards_pre_fsp_incl_va[0]]
    
    #Loop through each liquid rate pair and calculate required fwds
    for liquid_idx in range(1, N):
        
        t1 = liquid_maturities[liquid_idx]
        t2 = liquid_maturities[liquid_idx-1]
        t = t1 - t2
        
        if t > 1:
            
            fwd = (1+forwards_pre_fsp_incl_va[liquid_idx])
            base_rate = ((1+zero_rates[-1])**t2)
            
            for zero_run in range(1, t+1):
                z = base_rate * fwd**zero_run
                z = z**(1/(t2+zero_run)) - 1
                zero_rates.append(z)
            
        else:
            z = ((1+zero_rates[-1])**t2) * ((1+forwards_pre_fsp_incl_va[liquid_idx])**t)
            z = z**(1/t1) - 1
            zero_rates.append(z)

    return zero_rates

In [None]:
def big_b(h, alpha=0.1):
    """
    Helper function
    """
    top = 1 - np.exp(-alpha*h)
    bottom = alpha * h
    return top / bottom

In [None]:
def extrapolate_fwds(h, ufr, llfr, alpha=0.10):
    """
    Methodology as described by EIOPA in "Background Document On The Optinion On The 2020 Revicew Of Solvency II" p.789.

    Parameters
    ----------
    h: 1 dimensional Ndarray
        tbd
    ufr: 1 dimensional Ndarray
        tbd
    llfr: 1 dimensional Ndarray
        tbd
    alpha: float
        tbd
            
    Returns
    -------
    fwd_fsp_fsp_plus_h: 1 dimensional Ndarray
        fwd that can be used for calculation of all zero rates in between the observed swap rates
    """
    fwd_fsp_fsp_plus_h = np.log(1+ufr) + (llfr - np.log(1+ufr)) * big_b(h, alpha)
    return fwd_fsp_fsp_plus_h

In [None]:
def extrapolate_zero(known_zero_rates, ufr, llfr, alpha=0.10, fsp=20):
    """
    Methodology as described by EIOPA in "Background Document On The Optinion On The 2020 Revicew Of Solvency II" p.790.
    """
    #FSP
    z_fsp = known_zero_rates[fsp-1]
    
    #Extrapolated zero rates
    extrapolated_zero_rates = known_zero_rates[0:fsp]
    
    #Regardless of fsp we want to calculate the extrapolated rates with a maturity of up to 120y
    up_to = 120 - fsp
    
    #Calculate extrapolated zero rates up to and including 120y maturity
    for h in range(1, up_to+1):
        z = np.exp((fsp*z_fsp+h*extrapolate_fwds(h, ufr, llfr, alpha))/(fsp+h)) - 1
        extrapolated_zero_rates.append(z[0])
            
    return extrapolated_zero_rates

In [None]:
def alternative_extrapolation(input_rates, input_liquid, ufr, fsp=20, alpha=None, va=0.0, volume_traded=np.array([3.3, 1.45 , 6, 0.3, 0.4])):
    """
    Wrapper function for alternative extrapolation method of SII curves. 
    Methodology as described by EIOPA in "Background Document On The Optinion On The 2020 Revicew Of Solvency II" p.783-790.
    """
    
    #Assign base variables
    liquid_maturities = np.where(input_liquid==1)[0] + 1
    liquid_rates_swap = input_rates[np.where(input_liquid==1)]
    
    #Interpolated liquid zero rates
    zero_rates_market_interpolated = compute_interpolated_zero(input_rates, liquid_maturities)
    
    #Introduction mechanism
    if alpha is None:
        rate_at_fsp = zero_rates_market_interpolated[fsp-1]
        alpha_interpolator = interp1d([-0.5, 0.5], [0.2, 0.1], kind='linear', bounds_error=False, fill_value=(0.2, 0.1))
        alpha = alpha_interpolator(rate_at_fsp)
    
    #Relevant forwards
    forwards_pre_fsp, forwards_llfr = extract_fwds(zero_rates_market_interpolated, liquid_maturities, fsp)
    
    #Last liquid forward rate
    llfr = compute_llfr(forwards_llfr, va=va)
    
    #Zero curve including potential volatility adjustment
    zero_curve_va = compute_curve_with_va(forwards_pre_fsp, liquid_maturities, fsp, va)
    
    #Extrapolated zero curve
    extrapolated_zero_curve = extrapolate_zero(zero_curve_va, ufr, llfr, alpha, fsp)
    
    return extrapolated_zero_curve

# EIOPA shocks

In [7]:
def get_eiopa_schock(method="standard"):
    
    #Input checks
    assert method in ("standard", "proposal")
    
    #Different schocks
    if method =="standard":
        
        floor = -100.0 # no floor
        
        up_relative_20 = [0.7, 0.7, 0.64, 0.59, 0.55, 0.52, 0.49, 0.47, 0.44, 0.42, 0.39, 0.37, 0.35, 0.34, 0.33, 0.31, 0.3, 0.29, 0.27, 0.26]
        down_relative_20 = [0.75, 0.65, 0.56, 0.5, 0.46, 0.42, 0.39, 0.36, 0.33, 0.31, 0.3, 0.29, 0.28, 0.28, 0.27, 0.28, 0.28, 0.28, 0.29, 0.29]
        up_absolute_20 = 0.0
        down_absolute_20 = 0.0
        
        up_relative_90 = 0.2
        down_relative_90 = 0.2
        up_absolute_60 = 0.0
        down_absolute_60 = 0.0
        
    if method == "proposal":
        
        floor = -0.0125
        
        up_relative_20 = [0.61, 0.53, 0.49, 0.46, 0.45, 0.41, 0.37, 0.34, 0.32, 0.3, 0.3, 0.3, 0.3, 0.29, 0.28, 0.28, 0.27, 0.26, 0.26, 0.25]
        down_relative_20 = [0.58, 0.51, 0.44, 0.4, 0.4, 0.38, 0.37, 0.38, 0.39, 0.4, 0.41, 0.42, 0.43, 0.44, 0.45, 0.47, 0.48, 0.49, 0.49, 0.5]
        up_absolute_20 = [0.0214, 0.0186, 0.0172, 0.0161, 0.0158, 0.0144, 0.013, 0.0119, 0.0112, 0.0105, 0.0105, 0.0105, 0.0105, 0.0102, 0.0098, 0.0098, 0.0095, 0.0091, 0.0091, 0.0088]
        down_absolute_20 = [0.0116, 0.0099, 0.0083, 0.0074, 0.0071, 0.0067, 0.0063, 0.0062, 0.0061, 0.0061, 0.006, 0.006, 0.0059, 0.0058, 0.0057, 0.0056, 0.0055, 0.0054, 0.0052, 0.005]
        
        up_relative_90 = 0.2
        down_relative_90 = 0.2
        up_absolute_60 = 0.0
        down_absolute_60 = 0.0
    
    #Define output matrix
    output = pd.DataFrame({"MATURITY": np.arange(0,120)+1, 
                           "DOWN_RELATIVE": np.nan, "DOWN_ABSOLUTE": np.nan, 
                           "UP_RELATIVE": np.nan, "UP_ABSOLUTE": np.nan, 
                           "FLOOR": floor})
    
    #Assignment
    output.iloc[0:20, 1] = down_relative_20
    output.iloc[0:20, 2] = down_absolute_20
    output.iloc[0:20, 3] = up_relative_20
    output.iloc[0:20, 4] = up_absolute_20
    
    #Specific points
    output.iloc[89:, 1] = down_relative_90
    output.iloc[59:, 2] = down_absolute_60
    output.iloc[89:, 3] = up_relative_90
    output.iloc[59:, 4] = up_absolute_60
    
    #Interpolate
    output = output.interpolate(kind="linear")
    
    return output

In [None]:
def apply_shock(extrapolated_zero_curve, method="standard"):
    """
    Calculation of absolute rate shocks in up & down scenario of S2.

    Parameters
    ----------
    extrapolated_zero_curve: 1 dimensional Ndarray
        basic risk free term structure
    method: str
        One can decide to apply the standard rate shocks or the shocks as proposed by EIOPA in 2020
            
    Returns
    -------
    rate_shock_abs: pd.DataFrame
        Contains absolute rate upward & downward rate shock
    """
    
    #Get relative & absolute shocks
    shocks = get_eiopa_schock(method=method)
    
    if method == "standard":
        
        #Only rates greater than 0% should be shocked
        rate_floor = 0.0
        rate_floor_idx = extrapolated_zero_curve > rate_floor
        
        #Calculate shocked curves
        s2_spot_down = extrapolated_zero_curve * (1-shocks["DOWN_RELATIVE"].values*rate_floor_idx*1)
        s2_spot_up = extrapolated_zero_curve + np.clip(extrapolated_zero_curve * (shocks["UP_RELATIVE"].values), 0.01, None)
        
    if method == "proposal":
        
        #Calculate shocked curves
        s2_spot_down = extrapolated_zero_curve * (1-shocks["DOWN_RELATIVE"].values) - shocks["DOWN_ABSOLUTE"].values
        s2_spot_up = extrapolated_zero_curve * (1+shocks["UP_RELATIVE"].values) + shocks["UP_ABSOLUTE"].values
    
    #Absolute rate shock
    rate_shock_down = s2_spot_down - extrapolated_zero_curve
    rate_shock_up = s2_spot_up - extrapolated_zero_curve
    
    #Output
    rate_shock_abs = pd.DataFrame({"abs_shock_down":rate_shock_down, "abs_shock_up":rate_shock_up})
    
    return rate_shock_abs