In [None]:
#Imports and Class Initialization
import time
from typing import Any, Callable, Dict, List, Tuple

import numpy as np
import pandas as pd
import plotly.graph_objects as go
from tqdm import tqdm


class FXBarrierOptionPricer:
    """
    Monte Carlo pricer for FX barrier options with support for American-style exercise
    """

    def __init__(
        self,
        num_paths: int = 100000,
        num_steps: int = 252,
        seed: int = 42,
        antithetic: bool = True,
        control_variate: bool = True,
    ):
        """
        Pricer initialization
        """
        self.num_paths = num_paths
        self.num_steps = num_steps
        self.seed = seed
        self.antithetic = antithetic
        self.control_variate = control_variate
        np.random.seed(seed)

    # Part 1: Methods for Path Generation and Option Pricing
    # The price of a barrier option is typically quoted per unit of notional.
    # The notional is a simple scalar factor and does not influence the other
    # input variables and the price itself. the scaling of dividing by K
    # reasonable because instead of working with three different variables, we
    # simplify the adimensional input of the model. This ratio allows to
    # capture the sensitivity of spot levels and barriers Dividing by K is a
    # common way to normalize the input range.
    def generate_paths(
        self,
        S0_K: float,
        T: float,
        r: float,
        sigma: float,
        sigma_curve: Callable[[np.ndarray], np.ndarray] | None = None,
    ) -> np.ndarray:
        """
        Generate price paths for the Black-Scholes model
        """
        dt = T / self.num_steps
        effective_paths = self.num_paths // 2 if self.antithetic else self.num_paths
        Z = np.random.normal(0, 1, size=(effective_paths, self.num_steps))

        if self.antithetic:
            paths = np.zeros((self.num_paths, self.num_steps + 1))
            paths[:, 0] = S0_K
        else:
            paths = np.zeros((effective_paths, self.num_steps + 1))
            paths[:, 0] = S0_K

        time_grid = np.linspace(0, T, self.num_steps + 1)

        for t in range(1, self.num_steps + 1):
            if sigma_curve is not None:
                vol = sigma_curve(time_grid[t])
            else:
                vol = sigma

            if self.antithetic:
                increment_1 = (r - 0.5 * vol**2) * dt + vol * np.sqrt(dt) * Z[:, t - 1]
                increment_2 = (r - 0.5 * vol**2) * dt - vol * np.sqrt(dt) * Z[:, t - 1]

                paths[:effective_paths, t] = paths[:effective_paths, t - 1] * np.exp(
                    increment_1
                )
                paths[effective_paths:, t] = paths[effective_paths:, t - 1] * np.exp(
                    increment_2
                )
            else:
                paths[:, t] = paths[:, t - 1] * np.exp(
                    (r - 0.5 * vol**2) * dt + vol * np.sqrt(dt) * Z[:, t - 1]
                )

        return paths

    def price_european_barrier(
        self,
        S0_K: float,
        B_K: float,
        T: float,
        r: float,
        sigma: float,
        option_type: str = "call",
        barrier_type: str = "up-and-out",
        sigma_curve: Callable[[np.ndarray], np.ndarray] | None = None,
    ) -> Dict:
        """
        Calculate the price of a European barrier option using Monte Carlo
        """
        paths = self.generate_paths(S0_K, T, r, sigma, sigma_curve)
        barrier_crossed = np.zeros(self.num_paths, dtype=bool)

        if "up" in barrier_type:
            barrier_crossed = np.any(paths >= B_K, axis=1)
        else:
            barrier_crossed = np.any(paths <= B_K, axis=1)

        final_prices = paths[:, -1]
        if option_type == "call":
            payoffs = np.maximum(final_prices - 1.0, 0)
        else:
            payoffs = np.maximum(1.0 - final_prices, 0)

        if "out" in barrier_type:
            payoffs[barrier_crossed] = 0
        else:
            payoffs[~barrier_crossed] = 0

        discount_factor = np.exp(-r * T)

        if self.control_variate and option_type == "call" and "out" not in barrier_type:
            d1 = (np.log(S0_K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
            d2 = d1 - sigma * np.sqrt(T)

            import scipy.stats as stats

            bs_price = S0_K * stats.norm.cdf(d1) - np.exp(-r * T) * stats.norm.cdf(d2)

            vanilla_payoffs = np.maximum(final_prices - 1.0, 0)
            vanilla_mc = discount_factor * np.mean(vanilla_payoffs)

            beta = 1.0
            control_correction = beta * (bs_price - vanilla_mc)
            price = discount_factor * np.mean(payoffs) + control_correction
        else:
            price = discount_factor * np.mean(payoffs)

        std_err = discount_factor * np.std(payoffs) / np.sqrt(self.num_paths)

        return {
            "price": price,
            "std_err": std_err,
            "rel_err": std_err / price if price > 0 else np.nan,
            "conf_95": 1.96 * std_err,
        }

    def longstaff_schwartz_american(
        self,
        paths: np.ndarray,
        r: float,
        T: float,
        payoff_func: Callable[[np.ndarray], np.ndarray],
        basis_funcs: List[Callable[[np.ndarray], np.ndarray]] | None = None,
    ) -> float:
        """
        Implementation of Longstaff-Schwartz algorithm for American options
        """
        num_paths, num_steps_plus_one = paths.shape
        num_steps = num_steps_plus_one - 1
        dt = T / num_steps

        if basis_funcs is None:
            basis_funcs = [
                lambda x: np.ones_like(x),
                lambda x: x,
                lambda x: x**2,
                lambda x: x**3,
            ]

        cash_flows = np.zeros((num_paths, num_steps_plus_one))
        cash_flows[:, -1] = payoff_func(paths[:, -1])

        for t in range(num_steps - 1, 0, -1):
            exercise_value = payoff_func(paths[:, t])
            itm_indices = np.where(exercise_value > 0)[0]

            if len(itm_indices) > 0:
                X = np.column_stack([f(paths[itm_indices, t]) for f in basis_funcs])
                Y = np.exp(-r * dt) * cash_flows[itm_indices, t + 1]
                beta, _, _, _ = np.linalg.lstsq(X, Y, rcond=None)
                continuation_value = np.dot(X, beta)

                for i, path_idx in enumerate(itm_indices):
                    if exercise_value[path_idx] > continuation_value[i]:
                        cash_flows[path_idx, t] = exercise_value[path_idx]
                        cash_flows[path_idx, t + 1 :] = 0
                    else:
                        cash_flows[path_idx, t] = 0

        discount_factors = np.exp(-r * np.arange(num_steps_plus_one) * dt)
        discounted_cash_flows = cash_flows * discount_factors[np.newaxis, :]
        price = np.mean(np.sum(discounted_cash_flows, axis=1))

        return price

    def tilley_american(
        self,
        paths: np.ndarray,
        r: float,
        T: float,
        payoff_func: Callable[[np.ndarray], np.ndarray],
        num_bundles: int = 20,
    ) -> float:
        """
        Implementation of Tilley's algorithm for American options
        """
        num_paths, num_steps_plus_one = paths.shape
        num_steps = num_steps_plus_one - 1
        dt = T / num_steps

        option_values = np.zeros((num_paths, num_steps_plus_one))
        option_values[:, -1] = payoff_func(paths[:, -1])

        for t in range(num_steps - 1, 0, -1):
            exercise_values = payoff_func(paths[:, t])
            sorted_indices = np.argsort(paths[:, t])
            bundle_size = max(1, num_paths // num_bundles)

            continuation_values = np.zeros(num_paths)

            for i in range(0, num_paths, bundle_size):
                end_idx = min(i + bundle_size, num_paths)
                bundle_indices = sorted_indices[i:end_idx]
                continuation_values[bundle_indices] = np.exp(-r * dt) * np.mean(
                    option_values[bundle_indices, t + 1]
                )

            exercise_decision = exercise_values > continuation_values
            option_values[:, t] = np.where(
                exercise_decision, exercise_values, continuation_values
            )

        price = np.exp(-r * dt) * np.mean(option_values[:, 1])

        return price

    # Part 2: Parameter Space Sampling and Results Visualization
    def sample_parameter_space(
        self,
        bounds: Dict[str, Tuple[float, float]],
        num_samples: int = 1000,
        method: str = "uniform",
        stratified: bool = True,
    ) -> Dict[str, np.ndarray]:
        """
        Sample uniformly the parameter space in the hypercube defined by bounds
        """
        np.random.seed(self.seed)
        samples = {}

        if method == "uniform":
            if stratified:
                for param, (lower, upper) in bounds.items():
                    bins = np.linspace(0, 1, int(np.sqrt(num_samples)) + 1)
                    inds = np.random.randint(0, len(bins) - 1, num_samples)
                    random_offsets = np.random.random(num_samples)
                    samples[param] = lower + (upper - lower) * (
                        bins[inds] + random_offsets * (bins[inds + 1] - bins[inds])
                    )
            else:
                for param, (lower, upper) in bounds.items():
                    samples[param] = lower + (upper - lower) * np.random.random(
                        num_samples
                    )

        elif method == "latin_hypercube":
            try:
                from scipy.stats.qmc import LatinHypercube

                param_names = list(bounds.keys())
                n_dims = len(param_names)

                sampler = LatinHypercube(d=n_dims, rng=self.seed)
                lhs_samples = sampler.random(n=num_samples)

                for i, param in enumerate(param_names):
                    lower, upper = bounds[param]
                    samples[param] = lower + (upper - lower) * lhs_samples[:, i]
            except ImportError:
                print("scipy.stats.qmc not available, using uniform sampling")
                for param, (lower, upper) in bounds.items():
                    samples[param] = lower + (upper - lower) * np.random.random(
                        num_samples
                    )

        elif method == "sobol":
            try:
                from scipy.stats.qmc import Sobol

                param_names = list(bounds.keys())
                n_dims = len(param_names)

                sampler = Sobol(d=n_dims, rng=self.seed)
                sobol_samples = sampler.random(n=num_samples)

                for i, param in enumerate(param_names):
                    lower, upper = bounds[param]
                    samples[param] = lower + (upper - lower) * sobol_samples[:, i]
            except ImportError:
                print("scipy.stats.qmc not available, using uniform sampling")
                for param, (lower, upper) in bounds.items():
                    samples[param] = lower + (upper - lower) * np.random.random(
                        num_samples
                    )
        else:
            raise ValueError(f"Sampling method '{method}' not recognized")

        return samples

    def run_pricing_grid(
        self,
        samples: Dict[str, np.ndarray],
        option_type: str = "call",
        barrier_type: str = "up-and-out",
        american: bool = False,
        american_method: str = "longstaff_schwartz",
        sigma_curve: Callable[[np.ndarray], np.ndarray] | None = None,
        use_tqdm: bool = True,
        save_results: bool = True,
    ) -> Dict[str, Any]:
        """
        Run pricing on a grid of parameters sampled in the hypercube
        """
        num_samples = len(next(iter(samples.values())))
        prices = np.zeros(num_samples)
        errors = np.zeros(num_samples)

        if option_type == "call":
            payoff_func = lambda x: np.maximum(x - 1.0, 0)
        else:
            payoff_func = lambda x: np.maximum(1.0 - x, 0)

        start_time = time.time()
        iterator = tqdm(range(num_samples)) if use_tqdm else range(num_samples)

        for i in iterator:
            S0_K = samples["S0_K"][i]
            B_K = samples["B_K"][i]
            T = samples["T"][i]
            r = samples["r"][i]
            sigma = samples["sigma"][i]

            if not american:
                result = self.price_european_barrier(
                    S0_K, B_K, T, r, sigma, option_type, barrier_type, sigma_curve
                )
                prices[i] = result["price"]
                errors[i] = result["std_err"]
            else:
                paths = self.generate_paths(S0_K, T, r, sigma, sigma_curve)
                barrier_crossed = np.zeros(
                    (self.num_paths, self.num_steps + 1), dtype=bool
                )

                if "up" in barrier_type:
                    barrier_crossed = paths >= B_K
                else:
                    barrier_crossed = paths <= B_K

                if "out" in barrier_type:
                    first_cross = np.zeros(self.num_paths, dtype=int)

                    for p in range(self.num_paths):
                        cross_indices = np.where(barrier_crossed[p])[0]
                        if len(cross_indices) > 0:
                            first_cross[p] = cross_indices[0]
                        else:
                            first_cross[p] = self.num_steps + 1

                    def barrier_payoff_func(x, t):
                        payoffs = payoff_func(x)
                        for p in range(len(x)):
                            if t >= first_cross[p]:
                                payoffs[p] = 0
                        return payoffs

                    if american_method == "longstaff_schwartz":
                        modified_paths = paths.copy()
                        for p in range(self.num_paths):
                            if first_cross[p] <= self.num_steps:
                                if option_type == "call":
                                    modified_paths[p, first_cross[p] :] = 0
                                else:
                                    modified_paths[p, first_cross[p] :] = 2

                        prices[i] = self.longstaff_schwartz_american(
                            modified_paths, r, T, payoff_func
                        )
                    else:
                        modified_paths = paths.copy()
                        for p in range(self.num_paths):
                            if first_cross[p] <= self.num_steps:
                                if option_type == "call":
                                    modified_paths[p, first_cross[p] :] = 0
                                else:
                                    modified_paths[p, first_cross[p] :] = 2

                        prices[i] = self.tilley_american(
                            modified_paths, r, T, payoff_func
                        )

                    errors[i] = np.nan

            if not use_tqdm and (i + 1) % 100 == 0 or i == num_samples - 1:
                elapsed = time.time() - start_time
                remaining = (elapsed / (i + 1)) * (num_samples - i - 1)
                print(
                    f"Completed {i + 1}/{num_samples} samples. Remaining time: {remaining:.1f}s"
                )

        results_data = {**samples, "price": prices, "error": errors}
        results_df = pd.DataFrame(results_data)

        if save_results:
            timestamp = time.strftime("%Y%m%d_%H%M%S")
            filename = f"fx_barrier_{barrier_type}_{option_type}_{'american' if american else 'european'}_{timestamp}.csv"
            results_df.to_csv(filename, index=False)
            print(f"Results saved in {filename}")

        return {
            "samples": samples,
            "prices": prices,
            "errors": errors,
            "option_type": option_type,
            "barrier_type": barrier_type,
            "american": american,
            "american_method": american_method if american else None,
            "dataframe": results_df,
        }

    def plot_results(self, results: Dict, param_x: str, param_y: str | None = None):
        """
        Visualize pricing results using Plotly for interactive 3D plots
        """
        samples = results["samples"]
        prices = results["prices"]

        if param_y is None:
            fig = go.Figure(
                data=go.Scatter(
                    x=samples[param_x],
                    y=prices,
                    mode="markers",
                    marker=dict(
                        size=8, color=prices, colorscale="Viridis", showscale=True
                    ),
                )
            )

            fig.update_layout(
                title=f"Price of {results['barrier_type']} {results['option_type']} "
                + f"{'American' if results['american'] else 'European'} option",
                xaxis_title=param_x,
                yaxis_title="Option price",
            )
        else:
            fig = go.Figure(
                data=[
                    go.Scatter3d(
                        x=samples[param_x],
                        y=samples[param_y],
                        z=prices,
                        mode="markers",
                        marker=dict(
                            size=8, color=prices, colorscale="Viridis", showscale=True
                        ),
                    )
                ]
            )

            fig.update_layout(
                title=f"Price of {results['barrier_type']} {results['option_type']} "
                + f"{'American' if results['american'] else 'European'} option",
                scene=dict(
                    xaxis_title=param_x,
                    yaxis_title=param_y,
                    zaxis_title="Option price",
                    aspectmode="cube",
                ),
                margin=dict(l=0, r=0, b=0, t=0),
            )

        fig.show()


if __name__ == "__main__":
    pricer = FXBarrierOptionPricer(num_paths=5000, num_steps=252)

    # Parameters for up-and-out call options
    call_param_bounds = {
        "S0_K": (0.8, 1.0),  # Focus on values below strike for calls
        "B_K": (1.05, 1.5),  # Barrier definitely above the strike
        "T": (0.1, 2.0),
        "r": (0.0, 0.1),
        "sigma": (0.1, 0.5),
    }

    # Parameters for up-and-out put options
    put_param_bounds = {
        "S0_K": (0.7, 1.1),  # Wider range for puts
        "B_K": (1.05, 1.5),  # Same logic: barrier above the strike
        "T": (0.1, 2.0),
        "r": (0.0, 0.1),
        "sigma": (0.1, 0.5),
    }

    # Number of samples and method
    num_samples = 10000
    sampling_method = "latin_hypercube"  # More efficient sampling

    # Generate samples for calls
    call_samples = pricer.sample_parameter_space(
        call_param_bounds,
        num_samples=num_samples,
        method=sampling_method,
        stratified=True,
    )

    # Part 3: Calculate prices for European up-and-out call options
    european_call_results = pricer.run_pricing_grid(
        call_samples,
        option_type="call",
        barrier_type="up-and-out",
        american=False,
        use_tqdm=True,
    )

    pricer.plot_results(european_call_results, param_x="B_K")
    pricer.plot_results(european_call_results, param_x="S0_K", param_y="B_K")

    # Generate samples for puts
    put_samples = pricer.sample_parameter_space(
        put_param_bounds,
        num_samples=num_samples,
        method=sampling_method,
        stratified=True,
    )

    # Calculate prices for European up-and-out put options
    european_put_results = pricer.run_pricing_grid(
        put_samples,
        option_type="put",
        barrier_type="up-and-out",
        american=False,
        use_tqdm=True,
    )

    pricer.plot_results(european_put_results, param_x="B_K")
    pricer.plot_results(european_put_results, param_x="S0_K", param_y="B_K")


In [None]:
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from keras import callbacks
from keras.layers import Activation, Add, Dense, Input
from keras.models import Model
from sklearn.model_selection import train_test_split

In [None]:
# Fix random seeds for reproducibility
SEED = 42
np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)

In [None]:
# Load and concatenate CSV files
csv_files = [
    "./fx_barrier_up-and-out_put_european_20250627_233030.csv",
    "./fx_barrier_up-and-out_call_european_20250627_231950.csv",
]
df_list = [pd.read_csv(f, delimiter=",", header=0) for f in csv_files]
df = pd.concat(df_list, ignore_index=True)

# Drop rows where the price is NaN and remove duplicates
df = df.dropna().drop_duplicates()

# Prepare features and target
X = df.drop("price", axis=1)  # Features
y = df["price"]  # Target variable

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=SEED
)


# Define residual block function
def residual_block(x, units=100):
    shortcut = x
    out = Dense(units, activation="gelu")(x)
    out = Dense(units)(out)
    out = Add()([shortcut, out])
    out = Activation("gelu")(out)
    return out


# Build model with residual blocks
inputs = Input(shape=(X_train.shape[1],))
x = Dense(100, activation="gelu")(inputs)

for _ in range(3):
    x = residual_block(x, units=100)

outputs = Dense(1, activation="gelu")(x)
model = Model(inputs=inputs, outputs=outputs)

# Compile the model
model.compile(optimizer="adamw", loss="mae", metrics=["mae"])

# Learning rate scheduler callback
lr_scheduler = callbacks.ReduceLROnPlateau(
    monitor="val_loss", factor=0.5, patience=5, min_lr=1e-7, verbose=1
)

# Train the model
history = model.fit(
    X_train, y_train, epochs=100, validation_split=0.2, callbacks=[lr_scheduler]
)

# Evaluate on the test set
model.evaluate(X_test, y_test)
model.save("trained_modelEU.keras")

# Predict on the test set
y_pred = model.predict(X_test)

# Calculate absolute percentage errors
ape = np.abs((y_test - y_pred.flatten()) / y_test)

# Calculate the fraction of test samples with error <= 1%
frac_within_1pct = np.mean(ape <= 0.01)

print(f"Fraction of test samples with <= 1% absolute error: {frac_within_1pct:.4f}")

if frac_within_1pct >= 0.99:
    print("Success: At least 99% of test samples have error <= 1%.")
else:
    print("Warning: Less than 99% of test samples have error <= 1%.")

In [None]:
# Plot training and validation loss
plt.plot(history.history["loss"], label="Train Loss")
plt.plot(history.history["val_loss"], label="Val Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss (MAE)")
plt.title("Training and Validation Loss")
plt.legend()
plt.grid(True)
plt.show()

In [2]:
import numpy as np
from keras.models import Model, load_model


def NeuralNetworkPricer(S0_K, B_K, T, r, sigma, error):
    model = load_model("trained_modelEU.keras")
    return model.predict(np.array([[S0_K, B_K, T, r, sigma, error]]), verbose=0)[0][0]


price = NeuralNetworkPricer(
    0.933721665116849,
    1.0527040055999566,
    1.441423447424124,
    0.08992690681590387,
    0.10052181375702329,
    7.508051677402084e-05,
)
print(price)

0.0010471256
