In [None]:
import pandas as pd
import numpy as np
from scipy.stats import norm
from scipy.interpolate import CubicSpline
import math
from scipy.interpolate import InterpolatedUnivariateSpline


# 1. LSVCalibrator - класс для калибровки для локально стохастической модели волатильности

In [None]:


class LSVCalibrator:
    def __init__(self, S0, a0, r, T, N, time_grid,sigma_dup,kappa=1.5, alpha=1.0, kernel_type='gaussian'):
        """
        Описание: Инициализация калибратора LSV модели.

        Параметры:
        S0 - начальная цена актива
        a0 - пятый калиброваный параметр из модели Хестона (волатильность волатильности в квадрате)
        r - безрисковая процентная ставка
        T - временной горизонт
        N - количество траекторий
        time_grid - временная сетка (массив временных точек)
        kappa - параметр ширины ядра (по умолчанию 1.5)
        alpha - начальный параметр масштабирования (по умолчанию 1.0)
        kernel_type - тип ядра ('gaussian' или 'epanechnikov')
        """
        self.S0 = S0
        self.r = r
        self.T = T
        self.N = N
        self.time_grid = time_grid
        self.sigma_dup=sigma_dup
        self.kappa = kappa
        self.alpha = alpha
        self.kernel_type = kernel_type


        self.f0 = S0
        self.a0 = a0

        # 0) Заглушка для f- траекторий, a.
        self.f_trajectories = np.zeros((len(self.time_grid), self.N))

        self.a_trajectories = np.zeros((len(self.time_grid), self.N))
        # Список локальной N-ой волатильности
        self.sigmas_N = []

    def get_f_trajectories(self):
        return self.f_trajectories


    def kernel_function(self, x):
        """Описание: Функция ядра для оценки плотности."""

        if self.kernel_type == 'gaussian':
            return norm.pdf(x)
        elif self.kernel_type == 'epanechnikov':
            return (15/16) * (1 - x**2) * (np.abs(x) <= 1)
        else:
            raise ValueError("Unknown kernel type. Use 'gaussian' or 'epanechnikov'")

    def compute_sigma_vsi(self, t, p0, c0, k_put,k_call):
        """
        Описание: вычисление параметра σ_VSI.

        Параметры:
        t - текущее время
        p0 - цены put опционов
        c0 - цены call опционов
        k_init - сетка страйков
        """
        if t == 0:
            return 0.0

        term1 = np.sum(p0/ k_put**2 *np.concatenate((np.array([0]).reshape(1,),np.diff(k_put)),axis=0))
        term2 = np.sum(c0/ k_call**2 *np.concatenate((np.array([0]).reshape(1,),np.diff(k_call)),axis=0))

        sigma_vsi_sq = (2 * np.exp(self.r * t) / t) * (term1 + term2)
        return np.sqrt(sigma_vsi_sq)

    def compute_bandwidth(self, t, sigma_vsi):
        """Описание: вычисление ширины ядра h_{t,N} ."""
        return self.kappa * self.f0 * sigma_vsi * np.sqrt(max(t, 0.25) * self.N**(-0.5))

    def density_estimate(self, x, data_points, h):
        """Описание: Ядерная оценка плотности."""
        return self.kernel_function((x - data_points)/h)/h

    def calc_sigma_t_k(self, f_curr_sel, a_curr_sel, k, dt, p0, c0, k_put,k_call):
        """
        Описание: один шаг калибровки.

        Параметры:
        t_k - текущее время
        sigma_dup - dup-волатильность для текущего времени
        p0 - цены put опционов
        c0 - цены call опционов
        k_call, k_put - сетка страйков для калов и путов
        """
        t_k=k*dt


        # Вычисление ширины ядра
        sigma_vsi = self.compute_sigma_vsi(t_k, p0, c0, k_put,k_call)
        h = self.compute_bandwidth(t_k, sigma_vsi)



        # Создаем функцию для расчета новой волатильности
        def loc_sigma(t,f):
            # if k*dt<=t<(k+1)*dt:
            dup=self.sigma_dup(t,f)
            # Числитель: сумма ядерных оценок
            numerator = np.sum([self.density_estimate(f, f_i, h) for f_i in f_curr_sel])

            # Знаменатель: сумма взвешенных квадратов волатильностей
            denominator = np.sum([a_i * self.density_estimate(f, f_i, h)
                                 for f_i, a_i in zip(f_curr_sel, a_curr_sel)])

            if denominator == 0:
                return dup  # fallback

            return dup * np.sqrt(numerator / denominator)

        return loc_sigma

    def euler_step(self, loc_sigma, f_prev, a_prev, dt, heston_params):
        """
        Описание: Шаг схемы Эйлера для модели Хестона.

        Параметры:
        f_prev - предыдущие значения форвардов
        sigma_prev - предыдущие значения волатильности
        dt - шаг по времени
        heston_params - параметры модели Хестона (kappa, theta, xi, rho)
        """
        kappa, theta, xi, rho = heston_params

        # Генерация коррелированных броуновских движений
        Z1 = np.random.normal(0, 1, self.N)
        Z2 = rho * Z1 + np.sqrt(1 - rho**2) * np.random.normal(0, 1, self.N)

        # Обновление волатильности a_ ~ a^2
        a_new = a_prev + kappa * (theta - a_prev) * dt + \
                   xi * np.sqrt(a_prev) * np.sqrt(dt) * Z2
        a_new = np.maximum(a_new, 0)  # Гарантируем положительность

        # Обновление форвардов
        f_new = f_prev * (1 + r*dt + loc_sigma*np.sqrt(a_prev) * np.sqrt(dt) * Z1)
        return f_new, a_new

    def sort_outliers(self,N, f_k, a_k, alpha=1e-3):
        #N-число путей, (f,a) - пары значений форварда и волы в момент времени t_k. alpha - параметр среза, в идеале N*alpha целое
        #возвращает два отсорченных без выбросов массивов пар f,a
        arr = np.zeros(len(f_k), dtype=[('keys', f_k.dtype), ('values', a_k.dtype)])
        arr['keys'] = f_k
        arr['values'] = a_k
        k = int(N * alpha)
        arr_sorted = np.sort(arr, order='keys')


        sorted_f = arr_sorted['keys']
        sorted_a = arr_sorted['values']
        return sorted_f[k:-k], sorted_a[k:-k]

    def random_G_f (self,N,t,f, a,N_f_min=15, N_f=30,seed=42):
        """Описание: случайны отсев данных.  """
        #f,a - отсорченны, |G_f|=max(N_f*sqrt(t),N_f_min)
        #возвращает равномерно без повторений |G_f| пар
        np.random.seed(seed)  # Для NumPy
        n = max(N_f*math.sqrt(t),N_f_min)
        indices = np.random.choice(N, size=n, replace=False)
        indices = [round(i) for i in indices]
        return f[indices], a[indices]

    def precise_stepped_indices(self,start, stop, step):
       n_elements = int((stop - start) // step) + 1
       return [start + i*step for i in range(n_elements) if start + i*step < stop]

    def quantile_G_f (self,N,t,f, a,N_f_min=15, N_f=20,seed=42):
       """Описание: квантильный отсев данных."""
       #f,a - отсорченны, |G_f|=max(N_f*sqrt(t),N_f_min)
       #возвращает поквантильные значения из списка
       np.random.seed(seed)
       n = max(N_f*math.sqrt(t),N_f_min)
       indices = self.precise_stepped_indices(0, N-1, n)
       indices = [round(i) for i in indices]
       return f[indices], a[indices]

    def full_calibration(self, sigma_dup_func, heston_params, p0_data, c0_data, k_put_init_data , k_call_init_data,sel_qual_data="quantil"):
        """
        Описание: Полная процедура калибровки.
        heston_params,  sigma_dup_func - волатильность дюпира, уже реализованный в виде функции.
        Данные подаем в таком виде, как тут написано, до этого их надо правильно разделить.

        Параметры:
        sigma_dup_func - функция dup-волатильности sigma_dup(t, f)
        heston_params - параметры модели Хестона (kappa, theta, xi, rho)
        p0_data - данные по ценам put опционов для каждого t_k
        c0_data - данные по ценам call опционов для каждого t_k
        k_put_init_data, k_call_init_data - данные по сетке страйков для каждого t_k

        """


        # Инициализация траектории начальными параметрами по времени
        self.f_trajectories[0] = self.f0

        self.a_trajectories[0] = self.a0

        # Массив для хранения функций локальной волатильности sigma_N (t,f) - для каждого отрезка [t_k,t_k+1] - своя функция
        self.sigmas_N = []

        # Проход по времени
        for k in range(len(self.time_grid)):
            t_k = self.time_grid[k]

            if k == 0:

                loc_sigma_k = lambda t,f: sigma_dup_func(t, f) / self.alpha
            else:
                # Выполняем шаг Эйлера
                dt = t_k - self.time_grid[k-1]
                self.f_trajectories[k], self.a_trajectories[k] = self.euler_step(loc_sigma_k(t_k,self.f_trajectories[k-1]),
                    self.f_trajectories[k-1], self.a_trajectories[k-1], dt, heston_params)

                # Считаем sigma_N(t_k,f_j) для фиксированного k и всех j=1,...,N  сразу

                if sel_qual_data=="quantil":
                      arr_f_k,arr_a_k=self.quantile_G_f(self.N,t_k, self.f_trajectories[k], self.a_trajectories[k])
                elif  sel_qual_data=="random":
                      arr_f_k,arr_a_k=self.random_G_f(self.N,t_k, self.f_trajectories[k], self.a_trajectories[k])

                loc_sigma_k = self.calc_sigma_t_k(arr_f_k,arr_a_k,k,dt,
                                               p0_data[k], c0_data[k], k_put_init_data[k],k_call_init_data[k])

            self.sigmas_N.append(loc_sigma_k)
            # после ввыполнения функции в атрибутах будут откалиброванные параметры



# 2. Выгрузка данных

In [None]:
df = pd.read_excel("/content/sigma_dupir.xlsx") # таблица для построения интерполяции волатильности Дюпира.
df_1 = pd.read_csv("/content/heston_option_prices (1).csv") # Таблица с ценами опционов.


# 3. Обработка входных таблиц, интерполированная $\sigma_{Dup}$

In [None]:
def prepare_for_dupir(dupir_df):
   """
   Описание:
   Функция по данным(dupir_df: страйкам, времени, значениям из формулы дюпира)
   интерполирует волатильность_дюпира вдоль страйка для фиксированных
   моментов времени из таблицы(dupir_df).

   Возвращает:
   T_vals(list),splines_by_T(list): отсортированный список значений по времени, соответствующий список сплайнов.
   """
   T_vals = np.sort(dupir_df['T'].dropna().unique())


   splines_by_T = {}

   for T in T_vals:
     sub = dupir_df[dupir_df['T'] == T]
     strikes = sub['strike'].values
     sigmas = sub['sigma_dup'].values


     idx = np.argsort(strikes)
     strikes_sorted = strikes[idx]
     sigmas_sorted = sigmas[idx]


     spline = InterpolatedUnivariateSpline(strikes_sorted, sigmas_sorted, k=3, ext='const')
     splines_by_T[T] = spline
   return T_vals,splines_by_T

def prepare_market_data(market_data_df):
    """
    Описание:
    Преобразует DataFrame с рыночными данными в формат, подходящий для
    функции full_calibration.

    Аргументы:
    market_data_df (pd.DataFrame): DataFrame с колонками
                                   ['T', 'flag', 'strike', 'price'].
                                   'T' - время до экспирации в годах.
                                   'flag' - тип опциона ('put' или 'call').
                                   'strike' - цена страйка.
                                   'price' - цена опциона.

    Возвращает:
    tuple: Кортеж, содержащий:
        - time_grid (np.ndarray): Отсортированный массив уникальных времен экспирации.
        - p0_data (list): Список массивов цен put-опционов для каждой экспирации.
        - c0_data (list): Список массивов цен call-опционов для каждой экспирации.
        - k_put_data (list): Список массивов страйков put-опционов для каждой экспирации.
        - k_call_data (list): Список массивов страйков call-опционов для каждой экспирации.
    """

    time_grid = np.sort(market_data_df['T'].unique())


    p0_data = []
    c0_data = []
    k_put_data = []
    k_call_data = []


    for t in time_grid:

        df_t = market_data_df[market_data_df['T'] == t]


        puts = df_t[df_t['flag'] == 'put'].sort_values(by='strike').reset_index(drop=True)
        calls = df_t[df_t['flag'] == 'call'].sort_values(by='strike').reset_index(drop=True)


        p0_data.append(puts['price'].to_numpy())
        k_put_data.append(puts['strike'].to_numpy())


        c0_data.append(calls['price'].to_numpy())
        k_call_data.append(calls['strike'].to_numpy())

    return time_grid, p0_data, c0_data, k_put_data, k_call_data

# второй вариант для немного другого формата данных.
def prepare_market_data_2(market_data_df):
    """
    Описание:
    Преобразует DataFrame с рыночными данными в формат, подходящий для
    функции full_calibration.

    Аргументы:
    market_data_df (pd.DataFrame): DataFrame с колонками
                                   ['maturity', 'call','put', 'strike'].
                                   'T' - время до экспирации в годах.
                                   'call','put' - цена опциона ('put' или 'call', альтернатива варианту
                                   с колонками:'flag','price').
                                   'strike' - цена страйка.

    Возвращает:
    tuple: Кортеж, содержащий:
        - time_grid (np.ndarray): Отсортированный массив уникальных времен экспирации.
        - p0_data (list): Список массивов цен put-опционов для каждой экспирации.
        - c0_data (list): Список массивов цен call-опционов для каждой экспирации.
        - k_put_data (list): Список массивов страйков put-опционов для каждой экспирации.
        - k_call_data (list): Список массивов страйков call-опционов для каждой экспирации.
    """

    time_grid = np.sort(market_data_df['maturity'].unique())


    p0_data = []
    c0_data = []
    k_put_data = []
    k_call_data = []

    for t in time_grid:

        df_t = market_data_df[market_data_df['maturity'] == t]


        sort_by_strike = df_t.sort_values(by='strike').reset_index(drop=True)



        p0_data.append(sort_by_strike['put'].to_numpy())
        k_put_data.append(sort_by_strike['strike'].to_numpy())


        c0_data.append(sort_by_strike['call'].to_numpy())
        k_call_data.append(sort_by_strike['strike'].to_numpy())

    return time_grid, p0_data, c0_data, k_put_data, k_call_data

In [None]:
S0 = 1
a0 = 0.1    # пятый калиброванный параметр  Хестона
r = 0.0045
T = 1.2
N = 50000  # Количество траекторий

T_vals,splines_by_T=prepare_for_dupir(df)


# волатильность дюпира на основе интерполяции (от времени экспирации и страйка)
def sigma_dup(T,K):
    # T — произвольное (float)
    # Находим ближайшее T в сетке (или делаем интерполяцию между слоями)
    T_nearest = T_vals[np.abs(T_vals - T).argmin()]
    spline = splines_by_T[T_nearest]
    return spline(K).astype(float)


time_grid, p0_data, c0_data, k_put_data, k_call_data= prepare_market_data_2(df_1)
# Создаем калибратор
calibrator = LSVCalibrator(S0, a0, r, T, N, time_grid,sigma_dup, kappa=1.5, kernel_type='gaussian')

# Параметры модели Хестона (kappa, theta, xi, rho)
heston_params = (3, 0.3, 0.5, -0.2)





# Запуск калибровки
calibrator.full_calibration(sigma_dup, heston_params, p0_data, c0_data,  k_put_data, k_call_data)

# запуск траекторий после калибровки
f_trajectories = calibrator.get_f_trajectories()

In [None]:
f_trajectories.shape

(70, 10000)

In [None]:
# функции для прайсинга

def call_eval(f,T,K,r):
  return np.exp(-r*T)*(np.maximum((f[int(T*f.shape[0]/1.5),:]-K),0)).mean()

def put_eval(f,T,K,r):
  return np.exp(-r*T)*np.maximum(-(f[int(T*f.shape[0]/1.5),:]-K),0).mean()

call_eval(f_trajectories,0.5, 1, r)

In [None]:
f_trajectories[20,:].mean()