# Brownian motion simulation
We want here to simulate a regular Brownian motion.
Then, we will try to simulate a fractional Brownian motion.
We also want to study the time required to make all these simulations.
## Importing packages

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import quad
from time import perf_counter
from scipy.special import hyp2f1
from typing import Callable
%matplotlib inline

## Some global simulation parameters

In [None]:
N = 1000  # Number of time steps
times = []  # TImes for each simulation
time = np.linspace(10**-5, 10, N)  # Time array

## Simulating a regular Brownian motion

In [None]:
plt.figure(figsize=(15, 9))
plt.title('Simulating regular Brownian Motion')

t1 = perf_counter()

for n in range(5):
    W = [0]
    for i in range(1,N):
        W.append(W[i-1] + np.random.normal(0, 1))
    plt.plot(time, W, label=f'W{n+1}')

t2 = perf_counter()
print(f'Elapsed time for N = {N}: {t2-t1} s')
times.append(t2-t1)
plt.legend()
plt.show()

## Simulating a fractional Brownian motion
Now, we are going to simulate a fractional Brownian motion.
For that, we know:
$$\operatorname{Cov}(X_t, X_s) = \frac12 \left(t^{2\alpha} + s^{2\alpha} - \left|t-s\right|^{2\alpha}\right)$$
where $\alpha$ is the Hurst parameter.
- If $\alpha=\frac 12$, then we have a regular Brownian motion.
- If $\alpha<\frac12$, then all increments are negatively correlated.
- If $\alpha>\frac12$, then all increments are positively correlated.

Then, we will use the Cholesky decomposition to simulate a fractional Brownian motion.

How ever, when we have exactly $t=s=0$, then the covariance is not defined, as have $\sigma^2_{W_0}=0$.
So we will start from $t=s=10^{-5}$.

In [None]:
nb_alphas = 20  # Number of alphas to test

if nb_alphas % 2 == 0:
    nb_alphas += 1
    
alphas = []

for _ in range(nb_alphas // 2):
    alphas.append(np.random.uniform(0, 0.5))
    
alphas.append(0.5)

for _ in range(nb_alphas // 2):
    alphas.append(np.random.uniform(0.5, 1))
    
alphas.sort()
alphas = np.array(alphas)
print(f'Values for alpha: {alphas}.')
print(f'Number of alphas: {len(alphas)}.')

In [None]:
tmp = []
X_frac = {}

for alpha in alphas:
    Y_list = []
    plt.figure(figsize=(15, 9))
    plt.title(f'Simulating Fractionnal Brownian Motion with alpha = {alpha}')
    mat_cov = np.zeros((N, N))
    t1 = perf_counter()
    for t in range(N):
        for s in range(N):
            if (s > t):
                break
            mat_cov[t, s] = 0.5 * (time[t] ** (2 * alpha) + time[s] **
                                (2 * alpha) - np.abs(time[t] - time[s]) ** (2 * alpha))
            mat_cov[s,t] = mat_cov[t,s]

    C = np.linalg.cholesky(mat_cov)

    for i in range(5):
        Y = np.dot(C, np.random.normal(0, 1, N))
        Y[0] = 0
        plt.plot(time, Y, label=f'W{i+1}')
        Y_list.append(Y)
    t2 = perf_counter()
    
    
    print(f'Elapsed time for N = {N} and alpha = {alpha}: {t2-t1} s')
    tmp.append(t2-t1)
    plt.legend()
    plt.show()
    X_frac[alpha] = Y_list

times.append(np.mean(tmp))

## Riemann-Liouville Brownian motion

In [None]:
tmp = []
X_riem = {}
np.random.seed(42)
for alpha in alphas:
    Y_list = []
    plt.figure(figsize=(15, 9))
    plt.title(f'Simulating Riemann-Liouville Brownian Motion with alpha = {alpha}')
    mat_cov = np.zeros((N, N))
    
    t1 = perf_counter()

    for t in range(N):
        for s in range(N):
            if (s > t):
                break
            mat_cov[t, s] = quad(lambda x: (time[t] - x) ** (alpha - 0.5) * (time[s] - x) ** (alpha - 0.5), 0, time[s])[0] # Computing the integral
            mat_cov[s, t] = mat_cov[t, s]
    
    C = np.linalg.cholesky(mat_cov)

    for i in range(5):
        Y = np.dot(C, np.random.normal(0, 1, N))
        Y[0] = 0
        plt.plot(time, Y, label=f'W{i+1}')
        Y_list.append(Y)
        
    t2 = perf_counter()
    print(f'Elapsed time for N = {N} and alpha = {alpha}: {t2-t1} s')
    tmp.append(t2-t1)
    plt.legend()
    plt.show()
    X_riem[alpha] = Y_list

rl_time = tmp[:]

times.append(np.mean(tmp))
del tmp

We here notice that the last simulation takes a lot of time.

## Time required to simulate a Brownian motion
Here, we are going to plot the mean time required to simulate a Brownian motion of size $n=1000$.

In [None]:
names = ['Regular Brownian Motion', 'Fractionnal Brownian Motion', 'Riemann-Liouville Brownian Motion']
plt.figure(figsize=(15, 9))
plt.title(f'Time of computation for N = {N}')
plt.barh(names, times, color='blue')
plt.show()

We also want to plot the times for the Rieman-Liouville Brownian motion, depending on $\alpha$.

In [None]:
plt.figure(figsize=(15, 9))
plt.title(f'Time required regarding the value of alpha')
plt.scatter(alphas, rl_time, color='blue')
plt.show()

## Change of frequency for fractional Brownian motion

In [None]:
for alpha in alphas:
    L = []
    for X in X_frac[alpha]:
        s1 = 0
        s2 = 0
        for i in range(5,len(X)):
            s1 += np.abs(X[i] - 2 * X[i - 2] + X[i - 4]) ** 2
        for i in range(3,len(X)):
            s2 += np.abs(X[i] - 2 * X[i - 1] + X[i - 2]) ** 2
        L.append(np.log(s1 / s2) / (2 * np.log(2)) - alpha)
    print(f'For alpha = {alpha}: centered mean={np.mean(L)} and standard deviation={np.std(L)}')

This confirms that our model is correct, even if it's not highly efficient.
## Change of frequency for Riemann-Liouville Brownian motion

In [None]:
for alpha in alphas:
    L = []
    for X in X_riem[alpha]:
        s1 = 0
        s2 = 0
        for i in range(5,len(X)):
            s1 += np.abs(X[i] - 2 * X[i - 2] + X[i - 4]) ** 2
        for i in range(3,len(X)):
            s2 += np.abs(X[i] - 2 * X[i - 1] + X[i - 2]) ** 2
        L.append(np.log(s1 / s2) / (2 * np.log(2)) - alpha)
    print(f'For alpha = {alpha}: centered mean={np.mean(L)} and standard deviation={np.std(L)}')

This confirms that our model is correct, even if it's not highly efficient.

# Hybrid simulation
We will try to make two simulations: one using the gamma kernel $g(x)=x^\alpha\exp(-\lambda x)$, and the second using
the power-law kernel function $g(x)=x^\alpha (1+x)^{\beta-\alpha}$.
We also define the less kernel $L_g$, depending on the kernel $g$, by:
$$g(t-s)\approx (t-s)^\alpha L_g\left(\frac kn\right)\ \text{avec}\ t-s\in\left[\frac{k-1}n;\frac kn\right]\setminus\{0\}$$

In [None]:
def gamma_kernel(x : float, alpha : float, lambd : float) -> float:
    """Provides the gamma kernel

    Args:
        x (float): Value
        alpha (float): Alpha parameter, between -0.5 and 0.5.
        lambd (float): Lambda parameter, positive.

    Returns:
        float: Output of the kernel
    """
    assert -0.5 < alpha < 0.5, 'Alpha must be between -0.5 and 0.5.'
    assert lambd > 0, 'Lambda must be positive.'
    return x ** alpha * np.exp(-lambd * x)

def power_law_kernel(x: float, alpha : float, beta : float) -> float:
    """Provides the power law kernel

    Args:
        x (float): Value
        alpha (float): Alpha parameter, between -0.5 and 0.5.
        beta (float): Beta parameter, between -np.inf and -1/2

    Returns:
        float: Output of the kernel
    """
    assert -0.5 < alpha < 0.5, 'Alpha must be between -0.5 and 0.5.'
    assert beta < -0.5, 'Beta must be between -np.inf and -1/2'
    return x ** alpha * (1 + x) ** (beta - alpha)

def less_kernel(g : Callable[[float, tuple], float], k : int, n : int ,alpha : float, args : tuple) -> float:
    """Provides a less kernel, depending on the chosen kernel

    Args:
        g (Callable[[float, tuple], float]): _description_
        alpha (float): Alpha parameter
        k (int): Index
        n: (int) : Number of points
        args (tuple): Arguments of the kernel
    Returns:
        float: Value of the less kernel
    """
    assert -0.5 < alpha < 0.5, 'Alpha must be between -0.5 and 0.5.'
    assert 0 <= k < n, 'k must be inferior to n'
    mean = np.mean([(k - 1) / n, k / n])
    if np.abs(mean) <= 1e-6:
        mean = 1e-6
    return g(mean, alpha, *args) / (mean ** alpha)

def beta_k_star(k : int, alpha : float) -> float:
    """Provides the value of beta_k_star

    Args:
        k (int): Index
        alpha (float): Alpha parameter

    Returns:
        float: Value of beta_k_star
    """
    assert -0.5 < alpha < 0.5, 'Alpha must be between -0.5 and 0.5.'
    assert k > 0, 'k must be positive'
    core = (k ** (alpha + 1) - (k - 1) ** (alpha + 1)) / (alpha + 1)
    if alpha > 0:#Python cannot natively compute negative decimal exponents
        return core ** (1 / alpha)
    return 1 / (core ** (1 / alpha))

Now, we also need to simulate all the following random variables:
- $\displaystyle W^n_{i,j}=\int_{\frac in}^{\frac{i+1}n}\left(\frac{i+j}n-s\right)^\alpha\,\mathrm dW_s$.
- $\displaystyle W_i^n=\int_{\frac in}^{\frac{i+1}n}\mathrm{d}W_s$.

In [None]:
def Wnij(n: int, k : int, alpha: float = 0.5) -> np.ndarray[float]:
    """Generates a random variable following the distribution of W_{n,i,j}

    Args:
        n (int): Size of the sample
        k (int): Size of the vector
        alpha (float, optional): Alpha parameter. Defaults to 0.5.

    Returns:
        np.ndarray[float]: Returns a vector of size k + 1 that fits with the law.
    """
    def build_cov_mat(k,n,alpha) -> np.ndarray[float]:
        output = np.zeros((k+1,k+1))
        cov_mat = np.zeros((k+1,k+1))
        cov_mat[0,0] = 1 / n
        for i in range(1,k+1):
            cov_mat[0,i] = ((i) ** (alpha + 1) - (i - 1) ** (alpha + 1)) / ((alpha + 1) * n ** (alpha + 1))
            cov_mat[i,0] = cov_mat[0,i]
            cov_mat[i,i] = ((i) ** (2 * alpha + 1) - (i - 1) ** (2 * alpha + 1)) / ((2 * alpha + 1) * n ** (2 * alpha + 1))
        for j in range(1,k+1):
            for i in range(1,j):
                if j == 1:
                    cov_mat[i,j] = 1 / ((alpha + 1) * n ** (2 * alpha + 1)) * (
                    i ** (alpha + 1) * j ** alpha *
                    hyp2f1(-alpha,1,alpha + 2, i / j)
                    )
                    cov_mat[j,i] = cov_mat[i,j]
                else:
                    cov_mat[i,j] = 1 / ((alpha + 1) * n ** (2 * alpha + 1)) * (
                    i ** (alpha + 1) * j ** alpha *
                    hyp2f1(-alpha,1,alpha + 2, i / j) -
                    (i - 1) ** (alpha + 1) * (j - 1) ** (alpha + 1) *
                    hyp2f1(-alpha,1,alpha + 2, (i-1) / (j-1))
                    )
                    cov_mat[j,i] = cov_mat[i,j]
        return cov_mat
    output = np.zeros((k+1,k+1))
    output[:,0] = np.random.normal(0, 1, k+1)
    for i in range(2,k+1): 
        cov_mat = build_cov_mat(k,n,alpha)
        W = np.dot(np.linalg.cholesky(cov_mat), np.random.normal(0, 1, cov_mat.shape[0]))
        output[:,i] = W
    return output

def Wni(n: int) -> np.ndarray[float]:
    """Generates a normal sample of size n

    Args:
        n (int): Size of the sample

    Returns:
        np.ndarray[float]: Normal vector
    """
    return np.random.normal(size = n)

In [None]:
Nn = 50

def X_check(n: int, i: int, kappa: int, g : Callable[[float,tuple], float], alpha: float, W : np.ndarray, args: tuple = ()) -> float:
    """Computes the value of the variable X_check

    Args:
        n (int): Number of points
        i (int): Index of the variable
        kappa (int): Upper bound
        g (Callable[[float,tuple], float]): Kernel
        alpha (float): Alpha setting
        W (np.ndarray): Matrix of random variables
        args (tuple, optional): Arguments of the kernel. Defaults to ().

    Returns:
        float: Value of the first part of the variable
    """
    return np.sum([less_kernel(g, k, n, alpha, args) * W[i-k + Nn,k] for k in range(1, kappa + 1)])

def X_hat(n: int, i: int, kappa: int, g : Callable[[float,tuple], float], alpha: float, W  : np.ndarray, args: tuple = ()) -> float:
    """Computes the value of the variable X_hat

    Args:
        n (int): Number of points
        i (int): Index of the variable
        kappa (int): Lower bound
        g (Callable[[float,tuple], float]): Kernel
        alpha (float): Alpha setting
        W (np.ndarray): Vector of random variables
        args (tuple, optional): Arguments of the kernel. Defaults to ().

    Returns:
        float: Value of the second part of the variable
    """
    return np.sum([g(beta_k_star(k,alpha), alpha, *args) * W[i-k + Nn] for k in range(kappa + 1, n)])

def X_sum(n: int, i: int, kappa: int, g : Callable[[float,tuple], float], alpha: float, W1 : np.ndarray, W2 : np.ndarray, args: tuple = ()) -> float:
    """Computes the value of the variable X_sum

    Args:
        n (int): Number of points
        i (int): Index of the variable
        kappa (int): Lower bound
        g (Callable[[float,tuple], float]): Kernel
        alpha (float): Alpha setting
        W1 (np.ndarray): Matrix of random variables
        W2 (np.ndarray): Vector of random variables
        args (tuple, optional): Arguments of the kernel. Defaults to ().

    Returns:
        float: Value of the variable
    """
    return X_check(n, i, kappa, g, alpha, W1, args) + X_hat(n, i, kappa, g, alpha, W2, args)

In [None]:
kappa = 2

n = len(time)

for alpha in alphas:
    for _ in range(5):
        W1 = Wnij(n, kappa, alpha)
        W2 = Wni(n)
        X = np.array([X_sum(n, i, kappa, gamma_kernel, alpha, W1, W2, (1,)) for i in range(n)])
        plt.plot(time, X, label = f'alpha = {alpha}')
        plt.show()