In [1]:
import numpy as np
from abc import ABC, abstractmethod
from scipy.optimize import minimize
from scipy.optimize import newton, ridder, brentq
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal, norm
from scipy.interpolate import CubicSpline, RegularGridInterpolator
from mpl_toolkits.mplot3d import Axes3D
from sklearn.metrics import mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor
import plotly.graph_objs as go
import plotly.subplots as sp

In [2]:
class HestonModel:
    """Класс для реализации модели Хестона"""
    def __init__(self, S0: float, v0: float, kappa: float, theta: float, sigma: float, rho: float, r: float = 0.0):
        """
        Инициализация параметров модели Хестона
        :param S0: Начальная цена базового актива
        :param v0: Начальная волатильность
        :param kappa: Скорость возврата (mean reversion)
        :param theta: Долгосрочная средняя волатильность
        :param sigma: Волатильность волатильности (vol of vol)
        :param rho: Корреляция между движениями цены и волатильности
        :param r: Безрисковая процентная ставка
        """
        self.S0 = S0
        self.v0 = v0
        self.kappa = kappa
        self.theta = theta
        self.sigma = sigma
        self.rho = rho
        self.r = r

    def calculate_option_price(self, K: float, T: float, num_paths: int = 10000, num_steps: int = 100) -> float:
        """Расчет цены опциона методом Монте-Карло"""
        # Реализовать метод Монте-Карло или аналитическое решение
        paths, _ = self.simulate_paths(num_paths, num_steps, T)
        ST = paths[:, -1]
        payoff = np.maximum(ST - K, 0)
        return np.exp(-self.r * T) * np.mean(payoff)
    
    def black_scholes_price(S: float, K: float, T: float, r: float, sigma: float) -> float:
        """Расчет цены колл-опциона по Блэку-Шоулзу"""
        d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*np.sqrt(T))
        d2 = d1 - sigma*np.sqrt(T)
        return S*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)

    def calculate_implied_volatility(self, K: float, T: float, num_paths: int = 10000, num_steps: int = 100) -> float:
        """Расчет подразумеваемой волатильности"""

        price = self.calculate_option_price(K, T, num_paths, num_steps)

        intrinsic = max(self.S0 - K * np.exp(-self.r * T), 0)
        
        if price < intrinsic - 1e-5:
            return np.nan
        if price > self.S0:
            return np.nan
        
        if price <= 1e-8:
            return 0.0

        def iv_obj(sigma):
            bs_price = HestonModel.black_scholes_price(self.S0, K, T, self.r, sigma)
            print('bsh : ', bs_price, 'sigma : ', sigma)
            return bs_price - price

        try:
            # Автоматический подбор границ
            low_vol, high_vol = 0.001, 5.0
            max_tries = 10
            
            while iv_obj(low_vol) * iv_obj(high_vol) > 0 and max_tries > 0:
                low_vol /= 2
                high_vol *= 2
                max_tries -= 1
            
            if iv_obj(low_vol) * iv_obj(high_vol) > 0:
                return np.nan
                
            iv = brentq(iv_obj, low_vol, high_vol, xtol=1e-6, maxiter=50)
            return max(iv, 0)
        
        except (RuntimeError, ValueError):
            return np.nan
    
    def simulate_paths(self, num_paths: int, num_steps: int, T: float) -> np.ndarray:
        """Генерация путей цены"""
        # Реализовать схему симуляции (например, Euler-Maruyama)
        dt = T / num_steps
        S = np.zeros((num_paths, num_steps + 1))
        v = np.zeros((num_paths, num_steps + 1))
        S[:, 0] = self.S0
        v[:, 0] = self.v0

        cov = np.array([[1, self.rho], [self.rho, 1]])
        for t in range(1, num_steps + 1):
            dw = multivariate_normal.rvs(mean=[0, 0], cov=cov, size=num_paths)
            dw1 = dw[:, 0] * np.sqrt(dt)
            dw2 = dw[:, 1] * np.sqrt(dt)

            v[:, t] = np.maximum(v[:, t-1] + self.kappa * (self.theta - v[:, t-1]) * dt  + self.sigma * np.sqrt(np.abs(v[:, t-1])) * dw2, 0)
            S[:, t] = S[:, t-1] * np.exp((self.r - 0.5 * v[:, t-1]) * dt + np.sqrt(np.abs(v[:, t-1])) * dw1)

        return S, v
    
    def plot_simulation(self, num_paths: int, num_steps: int, T: float):
        """Визуализация траекторий цен и волатильности"""
        S_paths, v_paths = self.simulate_paths(num_paths, num_steps, T)
        fig = sp.make_subplots(rows=2, cols=1, subplot_titles=('Траектории цен базового актива', 'Траектории волатильности'))
        for i in range(num_paths):
            fig.add_trace(go.Scatter(x=list(range(num_steps)), y=S_paths[i], mode='lines', name=f'Path {i+1}'), row=1, col=1)

        for i in range(num_paths):
            fig.add_trace(go.Scatter(x=list(range(num_steps)), y=v_paths[i], mode='lines', name=f'Path {i+1}'), row=2, col=1)

        fig.update_layout(height=600, title_text='Визуализация траекторий', showlegend=False)
        fig.update_xaxes(title_text='Временные шаги', row=1, col=1)
        fig.update_yaxes(title_text='Цена', row=1, col=1)
        fig.update_xaxes(title_text='Временные шаги', row=2, col=1)
        fig.update_yaxes(title_text='Волатильность', row=2, col=1)
        fig.show()

In [3]:
model = HestonModel(S0=100.0, v0=0.04, kappa=2.0, theta=0.04, sigma=0.5, rho=-0.7)
model.plot_simulation(num_paths=100, num_steps=1000, T=1.0)

In [4]:
class VolatilitySurface:
    """Класс для работы с поверхностью волатильности"""
    def __init__(self, strikes: np.ndarray, maturities: np.ndarray):
        """
        :param strikes: Массив страйков
        :param maturities: Массив сроков до экспирации
        """
        self.strikes = strikes
        self.maturities = maturities
        self.surface = np.zeros((len(strikes), len(maturities)))
        self.model = None
        self.interpolator = None
    
    def build_from_heston(self, model: HestonModel):
        """Построение поверхности из модели Хестона"""
        self.model = model
        for i, K in enumerate(self.strikes):
            for j, T in enumerate(self.maturities):
                self.surface[i, j] = model.calculate_implied_volatility(K, T)
                print(self.surface[i, j])
        self._build_interpolator()

    def build_from_market_data(self, market_vols):
        self.surface = market_vols
        self._build_interpolator()

    def _build_interpolator(self):
        self.interpolator = RegularGridInterpolator(
            (self.strikes, self.maturities),
            self.surface,
            method='linear',
            bounds_error=False,
            fill_value=None
        )
    
    def get_vol(self, strike, maturity):
        if self.interpolator is None:
            raise RuntimeError('surface is not built')
        return self.interpolator([[strike, maturity]])[0]
    
    def plot_surface(self, title="Поверхность волатильности"):
        """Визуализация поверхности волатильности"""
        
        if self.interpolator is None:
            raise RuntimeError("Поверхность не построена")
            
        K_grid, T_grid = np.meshgrid(
            np.linspace(min(self.strikes), max(self.strikes), 20),
            np.linspace(min(self.maturities), max(self.maturities), 20)
        )
        
        vol_grid = np.zeros_like(K_grid)
        for i in range(K_grid.shape[0]):
            for j in range(K_grid.shape[1]):
                vol_grid[i, j] = self.get_vol(K_grid[i, j], T_grid[i, j])

        fig = go.Figure(data=[go.Surface(z=vol_grid, x=K_grid, y=T_grid, colorscale='Viridis')])
        
        fig.update_layout(
            title=title,
            scene=dict(
                xaxis_title='Страйк',
                yaxis_title='Срок до экспирации',
                zaxis_title='Волатильность'
            ),
            coloraxis_colorbar=dict(title='Волатильность')
        )

        fig.show()
    
    def calibrate(self, market_vols: np.ndarray, calibrator: 'Calibrator'):
        """Калибровка поверхности к рыночным данным"""
        calibrator.calibrate(self, market_vols)
        self._build_interpolator()

    def calc_metrics(self, market_vols):
        if market_vols.shape != self.surface.shape:
            raise ValueError("Несоответствие размеров данных")
            
        errors = self.surface - market_vols
        return {
            'mse': np.mean(errors**2),
            'mae': np.mean(np.abs(errors)),
            'max_error': np.max(np.abs(errors)),
            'relative_mse': np.mean((errors / market_vols)**2)
        }

In [5]:
class Calibrator(ABC):
    """Абстрактный базовый класс для калибровщиков"""
    @abstractmethod
    def calibrate(self, surface: VolatilitySurface, market_vols: np.ndarray):
        pass

class MSECalibrator(Calibrator):
    """Калибровка минимизацией MSE с улучшенной стабильностью"""
    def __init__(self, max_iter=100, tol=1e-6, verbose=True, params_bounds=None, fixed_params=None):
        self.max_iter = max_iter
        self.tol = tol 
        self.verbose = verbose
        self.params_bounds = params_bounds or {
            'v0': (1e-5, 0.5), 
            'kappa': (1e-5, 5.0),
            'theta': (1e-5, 0.3),
            'sigma': (1e-5, 1.0),
            'rho': (-0.95, 0.95)
        }
        self.fixed_params = fixed_params or {}
        self.opt_res = None
        self.param_scales = {
            'v0': 0.1,
            'kappa': 1.0,
            'theta': 0.1,
            'sigma': 0.1,
            'rho': 1.0
        }

    def calibrate(self, surface: VolatilitySurface, market_vols: np.ndarray):
        """Оптимизация параметров модели для минимизации MSE"""
        if market_vols.shape != surface.surface.shape:
            raise ValueError("Размерность рыночных данных не соответствует поверхности")
        
        if surface.model is None:
            raise ValueError("Поверхность должна быть построена из модели Хестона для калибровки")
        
        model = surface.model
        init_guess, bounds, param_names = [], [], []

        for param in ['v0', 'kappa', 'theta', 'sigma', 'rho']:
            if param not in self.fixed_params:
                raw_value = getattr(model, param)
                scaled_value = raw_value / self.param_scales[param]
                init_guess.append(scaled_value)
                low, high = self.params_bounds[param]
                scaled_low = low / self.param_scales[param]
                scaled_high = high / self.param_scales[param]
                bounds.append((scaled_low, scaled_high))
                param_names.append(param)

        best_params = None
        best_loss = float('inf')

        def loss_function(scaled_params):
            nonlocal best_params, best_loss
            
            params = []
            param_idx = 0
            for param in ['v0', 'kappa', 'theta', 'sigma', 'rho']:
                if param in self.fixed_params:
                    params.append(self.fixed_params[param])
                else:
                    raw_value = scaled_params[param_idx] * self.param_scales[param]
                    low, high = self.params_bounds[param]
                    clipped_value = np.clip(raw_value, low, high)
                    params.append(clipped_value)
                    param_idx += 1
            
            v0, kappa, theta, sigma, rho = params
            model.v0 = v0
            model.kappa = kappa
            model.theta = theta
            model.sigma = sigma
            model.rho = rho
            
            try:
                surface.build_from_heston(model)
                residuals = surface.surface - market_vols
                
                valid_mask = ~np.isnan(market_vols)
                mse = np.mean(residuals[valid_mask]**2)
                
                if mse < best_loss:
                    best_loss = mse
                    best_params = params.copy()
                    
                return mse
                
            except Exception as e:
                if self.verbose:
                    print(f"Ошибка расчета: {str(e)}")
                return 1e10 

        if self.verbose:
            from scipy.optimize import check_grad
            grad_error = check_grad(
                lambda x: loss_function(x), 
                lambda x: np.zeros_like(x),
                np.array(init_guess))
            print(f"Ошибка градиента: {grad_error:.6f}")

        try:
            result = minimize(
                loss_function,
                np.array(init_guess),
                method='L-BFGS-B',
                bounds=bounds,
                options={
                    'maxiter': self.max_iter,
                    'ftol': self.tol,
                    'disp': self.verbose,
                    'maxls': 50
                }
            )
        except Exception as e:
            if best_params is not None:
                result = type('', (), {})()
                result.x = best_params
                result.fun = best_loss
                result.success = True
                result.message = "Used best params after exception"
            else:
                raise RuntimeError(f"Ошибка оптимизации: {str(e)}")

        if best_params is not None:
            v0, kappa, theta, sigma, rho = best_params
            model.v0 = v0
            model.kappa = kappa
            model.theta = theta
            model.sigma = sigma
            model.rho = rho
            surface.build_from_heston(model)
        
        if self.verbose:
            print("Результаты калибровки:")
            print(f"Финальный MSE: {best_loss:.6f}")
            print("Оптимальные параметры:")
            print(f"v0 = {model.v0:.6f}")
            print(f"kappa = {model.kappa:.6f}")
            print(f"theta = {model.theta:.6f}")
            print(f"sigma = {model.sigma:.6f}")
            print(f"rho = {model.rho:.6f}")

        return best_loss

In [6]:
heston_params = {
        'S0': 100.0,
        'v0': 0.04,
        'kappa': 1.0,
        'theta': 0.04,
        'sigma': 0.2,
        'rho': -0.7,
        'r': 0.25
    }

heston_model = HestonModel(**heston_params)

strikes = np.array([80, 90, 100, 110, 120])
maturities = np.array([0.25, 0.5, 1.0])

vol_surface = VolatilitySurface(strikes, maturities)
vol_surface.build_from_heston(heston_model)
vol_surface.plot_surface()

bsh :  24.846954974921942 sigma :  0.001
bsh :  81.75366541881266 sigma :  5.0
bsh :  24.846954974921942 sigma :  0.001
bsh :  81.75366541881266 sigma :  5.0
bsh :  24.846954974921942 sigma :  0.001
bsh :  81.75366541881266 sigma :  5.0
bsh :  24.846954974921942 sigma :  0.004516002742627224
bsh :  54.369140344369725 sigma :  2.5022580013713136
bsh :  24.846954974921942 sigma :  0.00790232763800606
bsh :  36.10170205491602 sigma :  1.2550801645046599
bsh :  24.846954974921942 sigma :  0.012337619407686766
bsh :  27.576984522822798 sigma :  0.6337088919561733
bsh :  24.846954974921942 sigma :  0.02144751444726143
bsh :  25.079467643309002 sigma :  0.3275782032017174
bsh :  24.846954974921942 sigma :  0.07414498844038678
bsh :  24.852656836387112 sigma :  0.2008615958210521
bsh :  24.909139039969077 sigma :  0.26421989951138475
bsh :  24.87628931708187 sigma :  0.2393630217694885
bsh :  24.885182606728804 sigma :  0.24745235249509298
bsh :  24.887028704446834 sigma :  0.24896445013676022

In [7]:
market_volatilities = np.random.uniform(0.1, 0.4, size=(len(strikes), len(maturities)))

In [8]:
mse_calibrator = MSECalibrator()
vol_surface.calibrate(market_volatilities, mse_calibrator)

nan
nan
bsh :  37.69593735428761 sigma :  0.001
bsh :  99.02324071903134 sigma :  5.0
bsh :  37.69593735428761 sigma :  0.001
bsh :  99.02324071903134 sigma :  5.0
bsh :  37.69593735428761 sigma :  0.001
bsh :  99.02324071903134 sigma :  5.0
bsh :  37.69593735428761 sigma :  0.01692891190251977
bsh :  83.61875478638768 sigma :  2.50846445595126
bsh :  37.69593735428761 sigma :  0.027531097550168464
bsh :  59.56561352476963 sigma :  1.267997776750714
bsh :  37.69593735428761 sigma :  0.038615176048229075
bsh :  44.5545792050345 sigma :  0.6533064763994715
bsh :  37.69593735428761 sigma :  0.05612879045538316
bsh :  38.86868869209267 sigma :  0.3547176334274273
bsh :  37.69594410896997 sigma :  0.10588238711365044
bsh :  37.82846500561531 sigma :  0.23030001027053887
bsh :  38.112928910481656 sigma :  0.2807570742421059
bsh :  37.87513667046097 sigma :  0.24145463120547872
bsh :  37.89216360459876 sigma :  0.2450658278704893
bsh :  37.89133022369352 sigma :  0.2448936713190901
bsh :  37.


 Line search cannot locate an adequate point after MAXLS
  function and gradient evaluations.
  Previous x, f and g restored.
 Possible causes: 1 error in function or gradient evaluation;
                  2 rounding error dominate computation.


bsh :  15.452824346787182 sigma :  0.001
bsh :  80.59729691601675 sigma :  5.0
bsh :  15.452824346787182 sigma :  0.001
bsh :  80.59729691601675 sigma :  5.0
bsh :  15.452824346787182 sigma :  0.001
bsh :  80.59729691601675 sigma :  5.0
bsh :  15.452824346787182 sigma :  0.03679221147053063
bsh :  51.52757627891381 sigma :  2.518396105735265
bsh :  15.452824676717128 sigma :  0.06887795271589998
bsh :  31.88284990718627 sigma :  1.2936370292255825
bsh :  15.453587228246889 sigma :  0.10364718466685346
bsh :  21.95098471988544 sigma :  0.6986421069462181
bsh :  15.477807539383903 sigma :  0.1462900012658209
bsh :  17.80187218482969 sigma :  0.42246605410601945
bsh :  15.624225738555921 sigma :  0.1987480930264524
bsh :  16.267914975358636 sigma :  0.2901282037767544
bsh :  15.86229130956228 sigma :  0.24063073004520907
bsh :  15.923923824393412 sigma :  0.24924435893677405
bsh :  15.919108993492486 sigma :  0.2485912362203981
bsh :  15.919250252284257 sigma :  0.24861044212206534
bsh : 

In [9]:
vol_surface.plot_surface()

In [10]:
class NeuralNetworkCalibrator(Calibrator):
    """Калибровка с использованием нейронной сети"""
    def __init__(self, input_dim: int, hidden_layers: list = [64, 32], batch_size = 32, epochs=200, validation_split=0.2):
        self.input_dim = input_dim
        self.hidden_layers = hidden_layers
        self.batch_size = batch_size
        self.epochs = epochs
        self.validation_split=validation_split
        self.x_scaler= StandardScaler()
        self.y_scaler = StandardScaler()
        
        self.model = self._build_model()
    
    def _build_model(self) -> Sequential:
        """Построение архитектуры нейронной сети"""

        model = Sequential()
        model.add(Input(shape=(self.input_dim, )))

        for units in self.hidden_layers:
            model.add(Dense(units, activation='relu'))
            model.add(Dense(units, activation='relu'))
        
        model.add(Dense(5, activation='linear'))
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='mse',
            metrics=['mae']
        )
        return model
    
    def calibrate(self, surface, market_vols):
        return super().calibrate(surface, market_vols)

In [11]:
input_dim = vol_surface.surface.size

nn_calibrator = NeuralNetworkCalibrator(
    input_dim=input_dim,
    hidden_layers=[128, 64, 32],
    epochs=100,
    batch_size=64,
    validation_split=0.1
)

vol_surface.calibrate(market_volatilities, nn_calibrator)
vol_surface.plot_surface()

In [12]:
class GradientBoostingCalibrator(Calibrator):
    """Калибровка с помощью градиентного бустинга"""
    def __init__(self):
        self.model = XGBRegressor(objective='reg:squarederror')
    
    def calibrate(self, surface: VolatilitySurface, market_vols: np.ndarray):
        """Обучение модели градиентного бустинга"""
        # TODO: Подготовка данных и обучение
        X_train = []  # Входные параметры
        y_train = []  # Целевые значения
        
        print("Gradient boosting calibration completed")

In [13]:
gb_calibrator = GradientBoostingCalibrator()
vol_surface.calibrate(market_volatilities, gb_calibrator)

Gradient boosting calibration completed
