<a href="https://colab.research.google.com/github/abcdise/master_thesis/blob/main/MasterThesis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

In [None]:
class EulerMaruyama:

  # ==== Public ==== #

  # Constructor
  def __init__(self, start_time: float, end_time: float, num_intervals: int, \
               diffusion_dimension=1, num_gaussian_noises=1, num_paths=10000, \
               only_gaussian_noise=True):
    self.__start_time = start_time
    self.__end_time = end_time
    self.__num_intervals = num_intervals
    self.__num_gaussian_noises = num_gaussian_noises
    self.__only_gaussian_noise = only_gaussian_noise
    self.__num_paths = num_paths
    self.__time_intervals = np.linspace(start_time,
                                       end_time,
                                       num=num_intervals+1)
    self.__paths = np.zeros(shape=(num_paths, num_intervals+1, diffusion_dimension))
    self.__diffusion_dimension = diffusion_dimension

  # Set and get private attributes
  def set_start_time(self, start_time):
    self.__start_time = start_time

  def set_end_time(self, end_time):
    self.__end_time = end_time

  def set_num_intervals(self, num_intervals):
    self.__num_intervals = num_intervals

  def set_time_intervals(self, time_intervals):
    self.__time_intervals = time_intervals

  def set_dimension(self, d):
    self.__diffusion_dimension = d

  def set_num_paths(self, num_paths):
    self.__num_paths = num_paths

  def get_num_intervals(self):
    return self.__num_intervals

  def get_time_intervals(self):
    return self.__time_intervals

  def get_start_time(self):
    return self.__start_time

  def get_end_time(self):
    return self.__end_time

  def get_dimension(self):
    return self.__dimension

  def get_num_paths(self):
    return self.__num_paths

  def get_paths(self):
    return self.__paths

  def get_num_gaussian_noise(self):
    return self.__num_gaussian_noises

  def get_gaussian_noise(self):
    return self.__noise

  # Public methods
  def update_noise(self):
    self.__noise = np.random.multivariate_normal(mean=np.zeros(self.__num_gaussian_noises),
                                                 cov=np.eye(self.__num_gaussian_noises),
                                                 size=(self.__num_paths, self.__num_intervals))

    for i in range(self.__num_intervals):
      h = self.__time_intervals[i+1] - self.__time_intervals[i]
      self.__noise[:,i,:] *= np.sqrt(h)

  def generate_paths(self, diffusion):
      for i in range(self.__num_paths):
          self.__paths[i] = self.__generate_path(diffusion, self.__noise[i])



  # ==== Private ==== #

  # Private methods

  # Generate a path of the diffusion
  def __generate_path(self, diffusion, noise):
    ret = np.zeros(shape=(self.__num_intervals+1, self.__diffusion_dimension))
    # Set the starting point of the diffusion
    x_0 = diffusion.get_starting_point()
    if type(x_0) == int:
      ret[0] = x_0 * np.ones(self.__diffusion_dimension)
    else:
      ret[0] = x_0
    # Simulate the following states
    for i in range(1, self.__num_intervals+1):
      h = self.__time_intervals[i] - self.__time_intervals[i-1]
      delta_drift = diffusion.drift(self.__time_intervals[i-1], ret[i-1]) * h
      delta_diffusion = np.matmul(diffusion.diffusion(self.__time_intervals[i-1], ret[i-1]),
                                  noise[i-1])
      ret[i] = ret[i-1] + delta_drift + delta_diffusion
    return ret





In [None]:
class ItoDiffusion:

  # ==== Public ==== #

  # Constructor
  def __init__(self, drift_parameter, diffusion_parameter, starting_point, starting_time=0):
    self.drift = drift_parameter
    self.diffusion = diffusion_parameter
    self.__starting_point = starting_point
    self.__starting_time = starting_time

  # Access to private attributes
  def get_starting_point(self):
    return self.__starting_point

  def get_starting_time(self):
    return self.__starting_time

  # Public methods
  def simulate(self, solver):
    solver.generate_paths(self)
    return solver.get_paths()





In [None]:
# Langevin Dynamics
b = lambda t, x: np.array([-x])
sigma = lambda t, x: np.array([[np.sqrt(2)]])
diffusion = ItoDiffusion(b, sigma, 0)
solver = EulerMaruyama(start_time=0, end_time=1, num_intervals=100)
solver.update_noise()
solver.generate_paths(diffusion)

array([[ 0.        ],
       [ 0.04202974],
       [ 0.35137317],
       [ 0.40977624],
       [ 0.36475182],
       [ 0.465465  ],
       [ 0.54853727],
       [ 0.55194424],
       [ 0.49905202],
       [ 0.33764689],
       [ 0.38724653],
       [ 0.46101301],
       [ 0.34246523],
       [ 0.30266022],
       [ 0.39934426],
       [ 0.6258136 ],
       [ 0.50924535],
       [ 0.34478064],
       [ 0.18500441],
       [ 0.18888429],
       [ 0.03468443],
       [-0.09355847],
       [ 0.10337238],
       [ 0.09742191],
       [ 0.01742765],
       [ 0.00634469],
       [ 0.21878054],
       [ 0.1589912 ],
       [-0.0796205 ],
       [-0.12256608],
       [-0.08553357],
       [ 0.10914103],
       [ 0.2328743 ],
       [ 0.14337008],
       [ 0.00358793],
       [ 0.02848384],
       [ 0.01575714],
       [-0.10001335],
       [-0.07572745],
       [ 0.13625402],
       [ 0.14489657],
       [ 0.32531028],
       [ 0.07556097],
       [ 0.18651869],
       [ 0.30072812],
       [ 0