In [None]:
from matplotlib import pyplot as plt
import numpy as np
from numba import jitclass, int32, float64
from tqdm.notebook import tqdm
from random import random, gauss, seed
from collections import namedtuple
import operator
import math

from enum import Enum

# Define RIGHT and LEFT direction
class Direction(Enum):
    RIGHT = 1
    LEFT = -1


# Initialize random number seed (for reproducibility)
seed(42)

# Class definitions

In [None]:
# Lattice class
class Lattice:

    def __init__(self, num_jumps, step_var=None):
        self.MAX_LATTICE = 100000
        self.distro = np.zeros(shape=(2 * self.MAX_LATTICE + 1,), dtype=np.int32)
        self.n_jumps = num_jumps
        self.step_var = step_var if step_var is not None else 1.0

    def update_lattice(self, curr_pos):
        if abs(curr_pos) < self.MAX_LATTICE:
            self.distro[int(round(curr_pos)) + self.MAX_LATTICE] += 1
        else:
            pass

    def return_results(self, norm):
        discrete_distro = np.fromfunction(
            lambda x: np.exp(
                0.5 * np.log(2.0 / (self.n_jumps * math.pi * self.step_var))
                - (((x - self.MAX_LATTICE) ** 2) / (2.0 * self.n_jumps * self.step_var))
            ),
            (2 * self.MAX_LATTICE + 1,),
        )
        continous_distro = np.fromfunction(
            lambda x: (1 / np.sqrt(2 * math.pi * self.step_var * self.n_jumps))
            * np.exp(-(x - self.MAX_LATTICE) ** 2 / (2 * self.step_var * self.n_jumps)),
            (2 * self.MAX_LATTICE + 1,),
        )
        distro_mask_distro = self.distro > 0.5
        distro_mask_analytical = discrete_distro > 1e-6
        return (
            (
                np.argwhere(distro_mask_distro | distro_mask_analytical).flatten()
                - self.MAX_LATTICE
            ),
            self.distro[distro_mask_distro | distro_mask_analytical] / (1000 * norm),
            discrete_distro[distro_mask_distro | distro_mask_analytical],
            continous_distro[distro_mask_distro | distro_mask_analytical],
        )

In [None]:
# Sample class
spec = [
    ("T_MAX", int32),
    ("T0_MAX", int32),
    ("msd", float64[:]),
    ("count", int32[:]),
    ("tt0", int32[:]),
    ("origin", float64[:]),
    ("t_vacf", int32),
]


@jitclass(spec)
class Sample(object):
    def __init__(self):
        self.T_MAX = 10000
        self.T0_MAX = 100000
        self.msd = np.zeros(self.T_MAX, dtype=np.float64)
        self.count = np.zeros(self.T_MAX, dtype=np.int32)
        self.tt0 = np.zeros(self.T0_MAX, dtype=np.int32)
        self.origin = np.zeros(self.T0_MAX, dtype=np.float64)
        self.t_vacf = 0

    def start_new_walk(self):
        # reset t_vacf
        self.t_vacf = 0

    def sample_walk(self, curr_pos):
        # update parameters
        self.t_vacf += 1
        t_tel = (self.t_vacf - 1) % self.T0_MAX
        self.tt0[t_tel] = self.t_vacf
        self.origin[t_tel] = curr_pos

        # slice time interval
        dt = self.t_vacf - self.tt0[: min(self.t_vacf, self.T0_MAX)]
        pos = curr_pos - self.origin[: min(self.t_vacf, self.T0_MAX)]

        # update count and msd
        dt = dt[dt < self.T_MAX]
        self.count[dt] += 1
        self.msd[dt] += pos[:dt.shape[0]] ** 2

    def return_results(self):
        count_mask = self.count > 0.5
        return self.msd[count_mask] / self.count[count_mask]

# Convenience functions

In [None]:
# Function to plot the results
def plot_results(
    xbins,
    distribution,
    discrete_distribution,
    analytical_distribution,
    msd,
    N_JUMPS=100,
    N_CYCLES=100,
    step=1,
    format_fig="pdf",
    method="unbiased",
):
    with plt.rc_context({"figure.dpi": 150}):
        fig = plt.figure(figsize=(8, 4))
        plt.subplot(1, 2, 1)
        ax = plt.bar(xbins, height=distribution, width=0.4, label="Statistical")
        ax = plt.plot(xbins, discrete_distribution, "ro", label="Discrete")
        ax = plt.plot(xbins, analytical_distribution, "go", label="Continuous")
        plt.xlabel("X (lattice position)")
        plt.ylabel(f"P(X, {N_JUMPS})")
        plt.legend(loc="best")

        plt.subplot(1, 2, 2)
        ax = plt.plot(msd, "o", ms=2, label="Statistical")
        ax = plt.plot(step * np.arange(N_JUMPS, dtype=np.float64), "r", label="Analytical")
        plt.xscale("log")
        plt.yscale("log")
        plt.xlabel("N (Number of steps)")
        plt.ylabel("<$X^2$(N)>")
        plt.legend(loc="best")

        fig.suptitle(
            f"Random walk of {N_JUMPS} steps (averaged over {N_CYCLES} cycles)",
            fontsize=16,
        )
        fig.subplots_adjust(wspace=0.3)
        plt.savefig(f"NJUMPS_{N_JUMPS}_CYCLES_{N_CYCLES}_{method}.{format_fig}")

In [None]:
def random_walk(N_JUMPS=100, N_CYCLES=10, method="unbiased"):

    # Create random, lattice and sample instances
    random = RandomStep(method)
    lattice = Lattice(N_JUMPS, random.step_var)
    sample = Sample()

    # Main loop
    for i in tqdm(range(N_CYCLES)):
        for j in range(1000):
            sample.start_new_walk()
            curr_pos = 0.0
            for k in range(N_JUMPS):
                curr_pos = random.perform_random_step(curr_pos)
                sample.sample_walk(curr_pos)

            lattice.update_lattice(curr_pos)

    return sample, lattice

In [None]:
def main(N_JUMPS, N_CYCLES, method="unbiased"):

    # Perform random walk of N_JUMPS steps, averaging over N_CYCLE executions.
    sample, lattice = random_walk(N_JUMPS, N_CYCLES, method)

    #  Return MSD and distributions
    msd = sample.return_results()
    xbins, distro, discrete_distro, continuous_distro = lattice.return_results(N_CYCLES)

    # Plot results
    plot_results(
        xbins,
        distro,
        discrete_distro,
        continuous_distro,
        msd,
        step=lattice.step_var,
        N_JUMPS=N_JUMPS,
        N_CYCLES=N_CYCLES,
        method=method,
    )

In [None]:
class RandomStep(object):
    """ 
        method: random step method (unbiased, biased or off_lattice)
        step_var: step length variance <l^2> 
        
        perform_random_step(pos): given a position value `pos`,
            returns the updated position after performing a random step
        
    """
    METHODS = ["unbiased", "biased", "off_lattice"]

    def __init__(self, method):
        self.method = method
        self.step_var = 1.0

    method = property(operator.attrgetter("_method"))

    @method.setter
    def method(self, m):
        if not (m in self.METHODS):
            raise Exception("Invalid method.")
        self._method = m

    def perform_random_step(self, pos):
        if self.method == "unbiased":
            if random() < 0.5:
                return pos + Direction.RIGHT.value
            else:
                return pos + Direction.LEFT.value

        elif self.method == "biased":
            return pos

        elif self.method == "off_lattice":
            return pos 

        else:
            return pos

# Run the program

In [None]:
main(100, 10)

# EXERCISE_1: add bias to the `RandomStep` class

In [None]:
class RandomStep(object):
    """ 
        method: random step method (unbiased, biased or off_lattice)
        step_var: step length variance <l^2> 
        
        perform_random_step(pos): given a position value `pos`,
            returns the updated position after performing a random step
        
    """
    METHODS = ["unbiased", "biased", "off_lattice"]

    def __init__(self, method):
        self.method = method
        self.step_var = 1.0

    method = property(operator.attrgetter("_method"))

    @method.setter
    def method(self, m):
        if not (m in self.METHODS):
            raise Exception("Invalid method.")
        self._method = m

    def perform_random_step(self, pos):
        if self.method == "unbiased":
            if random() < 0.5:
                return pos + Direction.RIGHT.value
            else:
                return pos + Direction.LEFT.value

        elif self.method == "biased":
            # YOUR CODE HERE ...
            return pos

        elif self.method == "off_lattice":
            return pos

        else:
            return pos

# SOLUTION (execute this cell if you wish to see the solution)

In [None]:
# %load solutions/random_walk_bias.py

# Re-run with method 'biased'

In [None]:
main(100, 10, method="biased")

# EXERCISE_2: add off-lattice to the `RandomStep` class

In [None]:
class RandomStep(object):
    """ 
        method: random step method (unbiased, biased or off_lattice)
        step_var: step length variance <l^2> 
        
        perform_random_step(pos): given a position value `pos`,
            returns the updated position after performing a random step
        
    """
    METHODS = ["unbiased", "biased", "off_lattice"]

    def __init__(self, method):
        self.method = method
        self.step_var = 1.0

    method = property(operator.attrgetter("_method"))

    @method.setter
    def method(self, m):
        if not (m in self.METHODS):
            raise Exception("Invalid method.")
        self._method = m

    def perform_random_step(self, pos):
        if self.method == "unbiased":
            if random() < 0.5:
                return pos + Direction.RIGHT.value
            else:
                return pos + Direction.LEFT.value

        elif self.method == "biased":
            # YOUR CODE HERE FROM EXERCISE_1 ...
            return pos

        elif self.method == "off_lattice":
            # YOUR CODE HERE ...
            return pos

        else:
            return pos

# SOLUTION (execute this cell if you wish to see the solution)

In [None]:
# %load solutions/random_walk_off_lattice.py

# Re-run with method 'off_lattice'

In [None]:
main(100, 10, method="off_lattice")