In [9]:
from panel.interact import interact
import math

from scipy.stats import levy_stable, wrapcauchy
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import panel as pn
import panel.widgets as pnw
pn.extension("plotly")

radio_group_trajectory = pn.widgets.RadioButtonGroup(name="Trajectory", options=["BM", "CRW", "LF"],
button_type="default")
n_steps = pnw.IntSlider(name="n_steps", width=50, value=100, step=1, start=100, end=100000)
metric = pnw.Select(name="metric", width=10, options=["PL", "MSD"])
x_pos = pnw.IntInput(name="x_pos", width=10, value=0, start=0, end=10000, step=1)
y_pos = pnw.IntInput(name="y_pos", width=10, value=0, start=0, end=10000, step=1)
speed = pnw.IntSlider(name="speed", width=50, value=6, step=1, start=3, end=1000)
crw_exponent = pnw.FloatInput(name="crw exponent", width=10, value=0.2, start=0.2, end=1, step=0.1)
alpha = pnw.FloatInput(name="alpha", value=0.7, width=10, start=0.1, end=2, step=0.1)
beta = pnw.FloatInput(name="beta", value=0, width=10, start=0, end=1, step=0.1)
location = pnw.IntSlider(name="loc", width=50, value=3, step=1, start=3, end=10)

# Trajectory data
trajectory_df = None

class Vec2d(object):
    """
    2d vector class, supports vector and scalar operators, and also provides a bunch of high level functions
    http://www.pygame.org/wiki/2DVectorClass
    """
    __slots__ = ['x', 'y']

    def __init__(self, x_or_pair, y=None):
        if y == None:
            self.x = x_or_pair[0]
            self.y = x_or_pair[1]
        else:
            self.x = x_or_pair
            self.y = y

    # Addition
    def __add__(self, other):
        if isinstance(other, Vec2d):
            return Vec2d(self.x + other.x, self.y + other.y)
        elif hasattr(other, "__getitem__"):
            return Vec2d(self.x + other[0], self.y + other[1])
        else:
            return Vec2d(self.x + other, self.y + other)

    # Subtraction
    def __sub__(self, other):
        if isinstance(other, Vec2d):
            return Vec2d(self.x - other.x, self.y - other.y)
        elif (hasattr(other, "__getitem__")):
            return Vec2d(self.x - other[0], self.y - other[1])
        else:
            return Vec2d(self.x - other, self.y - other)

    # Vector length
    def get_length(self):
        return math.sqrt(self.x**2 + self.y**2)

    # rotate vector
    def rotated(self, angle):
        cos = math.cos(angle)
        sin = math.sin(angle)
        x = self.x*cos - self.y*sin
        y = self.x*sin + self.y*cos
        return Vec2d(x, y)
    
def single_euclidean_distance(pointA: list, pointB: list) -> float:
    """
    Method to calculate euclidian distance from 2 points

    :param pointA: PointA[x, y] coordinates
    :type pointA: list
    :param pointB: PointB[x, y] coordinates
    :type pointB: list
    :return: Euclidean distance
    :rtype: float
    """
    ed = math.sqrt(math.pow((pointB[0] - pointA[0]), 2) + math.pow((pointB[1] - pointA[1]), 2))
    return ed

def brownian_motion(n_steps: int = 1000, speed: int = 6, x_pos: int= 0, y_pos = 0) -> np.ndarray:
    """
    Method to calculate BM trajectory

    :param n_steps: steps number, defaults to 1000
    :type n_steps: int, optional
    :param speed: speed, defaults to 6
    :type speed: int, optional
    :param x_pos: starting position in x, defaults to 0
    :type x_pos: int, optional
    :param y_pos: starting position in y, defaults to 0
    :type y_pos: int, optional
    :return: Trajectory values at np.ndarray
    :rtype: np.ndarray
    """
    # Init velocity
    velocity = Vec2d(x_or_pair=speed, y=0)
    # Init dataframe
    BM_2d_df = pd.DataFrame(columns=["x_pos", "y_pos"])
    temp_df = pd.DataFrame([{"x_pos": x_pos, "y_pos": y_pos}])

    BM_2d_df = pd.concat([BM_2d_df, temp_df], ignore_index=True)

    for i in range(n_steps-1):
        turn_angle = np.random.uniform(low=-np.pi, high=np.pi)  # TYPE OF DISTRIBUTION
        velocity = velocity.rotated(turn_angle)
        temp_df = pd.DataFrame([{"x_pos": BM_2d_df.x_pos[i]+velocity.x, "y_pos":  BM_2d_df.y_pos[i]+velocity.y}])
        BM_2d_df = pd.concat([BM_2d_df, temp_df], ignore_index=True)

    global trajectory_df
    trajectory_df = BM_2d_df.to_numpy()
    return plot_figure(trace=BM_2d_df.to_numpy(), plot_type="trajectory")

def correlated_random_walk(crw_exponent, n_steps: int = 1000, x_pos= 0, y_pos= 0, speed: int = 6) -> np.ndarray:
    """
    Method to calculate CRW trajectory

    :param crw_exponent: Exponent for CRW calculation
    :type crw_exponent: float
    :param n_steps: steps number for trajectory, defaults to 1000
    :type n_steps: int, optional
    :param x_pos: starting position in x, defaults to 0
    :type x_pos: int, optional
    :param y_pos: starting position in y, defaults to 0
    :type y_pos: int, optional
    :param speed: speed, defaults to 6
    :type speed: int, optional
    :return: Trajectory values at np.ndarray
    :rtype: np.ndarray
    """
    # Init velocity
    velocity = Vec2d(x_or_pair=speed, y=0)

    # Init dataframe
    CRW_2d_df = pd.DataFrame(columns=["x_pos", "y_pos"])

    # Concatenate aux
    temp_df = pd.DataFrame([{"x_pos": x_pos, "y_pos": y_pos}])
    CRW_2d_df = pd.concat([CRW_2d_df, temp_df], ignore_index=True)

    for i in range(n_steps - 1):
        turn_angle = wrapcauchy.rvs(c=crw_exponent)
        velocity = velocity.rotated(turn_angle)
        temp_df = pd.DataFrame([{"x_pos": CRW_2d_df.x_pos[i]+velocity.x, "y_pos":  CRW_2d_df.y_pos[i]+velocity.y}])
        CRW_2d_df = pd.concat([CRW_2d_df, temp_df], ignore_index=True)

    global trajectory_df
    trajectory_df = CRW_2d_df.to_numpy()
    return plot_figure(trace=CRW_2d_df.to_numpy(), plot_type="trajectory", traj_type="CRW")

def levy_flight(alpha: float = 1.0, beta: float = 1.0, speed: int = 1, n_steps: int = 100,
                x_pos = 0, y_pos= 0, c: float = 0.7, loc: float = 0) -> np.ndarray:
    """
    Method to calculate Levy Fligth trajectory

    :param alpha: Alpha parameter, defaults to 1.0
    :type alpha: float, optional
    :param beta: Beta parameter, defaults to 1.0
    :type beta: float, optional
    :param speed: speed, defaults to 1
    :type speed: int, optional
    :param n_samples: number of steps for trajectory, defaults to 100000
    :type n_samples: int, optional
    :param x_pos: starting position in x, defaults to 0
    :type x_pos: int, optional
    :param y_pos: starting position in y, defaults to 0
    :type y_pos: int, optional
    :param c: exponent for couchy calculation, defaults to 0.7
    :type c: float, optional
    :param loc: location, defaults to 0
    :type loc: float, optional
    :return: Trajectory values at np.ndarray
    :rtype: np.ndarray
    """

    # init velocity vector
    velocity = Vec2d(x_or_pair=1, y=0)  # inicia horizontal
    std_motion = speed

    # init DF
    LW_2d_df = pd.DataFrame([{"x_pos": x_pos, "y_pos": y_pos}])

    i = 1
    while i < n_steps:
        # Get random n_steps from levy
        step_size = levy_stable.rvs(alpha, beta, std_motion)
        # round to int and positive
        step_size = int(np.ceil(abs(step_size)))
        # angle
        theta = wrapcauchy.rvs(c=c, loc=loc)
        velocity = velocity.rotated(theta)
        for j in range(step_size):
            temp_df = pd.DataFrame([{"x_pos": LW_2d_df.x_pos[i-1] + velocity.x,
                                    "y_pos":  LW_2d_df.y_pos[i-1] + velocity.y}])
            # add to the end to levy df
            LW_2d_df = pd.concat([LW_2d_df, temp_df], ignore_index=True)
            i += 1
    global trajectory_df
    trajectory_df = LW_2d_df.to_numpy()
    return plot_figure(trace=LW_2d_df.to_numpy(), plot_type="trajectory", traj_type="LF")

def path_length_calculation() -> np.array:
    """
    Method to calculate path length

    :return: path lenght distribution array
    :rtype: np.array
    """
    dataframe = trajectory_df
    pl_complete = np.cumsum(euclidean_distance(dataframe))
    return plot_figure(trace=pl_complete, plot_type="pl")

def euclidean_distance(df: np.array) -> np.array:
    """
    Method to calculate euclidean distance from complete array

    :param df: array with all the trajectory coordinates
    :type df: np.array
    :return: array with euclidean distances
    :rtype: np.array
    """
    return np.array([math.sqrt(math.pow((df[i+1,0] - df[i,0]), 2) +
                               math.pow((df[i+1,1] - df[i,1]), 2))
                     for i in range(df.shape[0]-1)])

def calculate_mean_sqrt_displacement() -> np.array:
    """
    Method to calculate MSD

    :return: Mean sqrt displacement array
    :rtype: np.array
    """
    # Empty MSD
    N = trajectory_df.shape[0]
    dataframe = trajectory_df
    MSD = np.zeros([N])
    cum_sum = 0

    for tau in range(1, N):
        for i in range(1, N-tau+1):
            cum_sum += math.pow(single_euclidean_distance(dataframe[i+tau-1], dataframe[i-1]), 2)
        MSD[tau] = (cum_sum/(N-tau))
        cum_sum = 0
    return plot_figure(trace=MSD, plot_type="msd")

def plot_figure(trace, plot_type: str="trajectory", traj_type: str = "BM") -> go.Figure:
    """
    Method to plot the selected information

    :param traces: Array with plotting information
    :type traces: np.ndarray
    :param plot_type: Select trajectory, msd or pl plotting
    :type plot_type: str
    :param traj_type: Name for the plotting figure, defaults to "BM"
    :type traj_type: str, optional
    :return: figure to be plotted
    :rtype: go.Figure
    """
    figure = go.Figure()

    if plot_type == "trajectory":
        figure.add_trace(go.Scatter3d(x=trace[:,0],
                                      y=trace[:,1],
                                      z=np.arange(len(trace)),
                                      marker=dict(size=2),
                                      line=dict(color="blue", width=2),
                                      mode="lines",
                                      name=f"{traj_type}_3D",
                                      showlegend=True))
        figure.update_layout(title_text=f"Trajectory {traj_type} 3D",  scene = dict(
                    xaxis_title="x_pos(m",
                    yaxis_title="y_pos(m)",
                    zaxis_title="time(s)"),
                    autosize=True)
    elif plot_type == "msd":
        figure.add_trace(go.Scatter(
            x=np.arange(len(trace)),
            y=trace,
            marker=dict(size=2),
            line=dict(width=2),
            mode="lines",
            name="MSD",
            showlegend=True
        ))
        figure.update_layout(title_text="Metrics MSD")
        figure.update_xaxes(title_text="x_pos(m)")
        figure.update_yaxes(title_text="y_pos(m)")
    else:
        figure.add_trace(go.Scatter(
        x = np.arange(len(trace)),
        y = trace,
        marker = dict(size=2),
        line = dict(width=2),
        mode = "lines",
        name = f"path_length",
        showlegend = True
    ))
        figure.update_layout(title_text="Metrics PL")
        figure.update_xaxes(title_text="x_pos(m)")
        figure.update_yaxes(title_text="y_pos(m)")
    return figure

@pn.depends(radio_group_trajectory)
def create_options(trajectory):
    if trajectory == "BM":
        return pn.Column(pn.Row(n_steps), pn.Row(x_pos, y_pos), pn.Row(speed), pn.Row(metric))
    if trajectory == "CRW":
        return pn.Column(pn.Row(n_steps), pn.Row(x_pos, y_pos), pn.Row(speed), pn.Row(crw_exponent), pn.Row(metric))
    elif trajectory == "LF":
        return pn.Column(pn.Row(n_steps), pn.Row(x_pos, y_pos), pn.Row(speed), pn.Row(alpha, beta), pn.Row(location), pn.Row(metric))

@pn.depends(radio_group_trajectory, n_steps, x_pos, y_pos, speed, crw_exponent, alpha, beta, location)
def trajectories(radio_group_trajectory, n_steps, x_pos, y_pos, speed, crw_exponent, alpha, beta, location):
    if radio_group_trajectory=="BM":
        return brownian_motion(n_steps, speed, x_pos, y_pos)
    elif radio_group_trajectory=="CRW":
        return correlated_random_walk(crw_exponent, n_steps, x_pos, y_pos, speed)
    elif radio_group_trajectory=="LF":
        return levy_flight(alpha, beta, speed, n_steps, x_pos, y_pos, crw_exponent, location)


@pn.depends(radio_group_trajectory, metric, n_steps, x_pos, y_pos, speed, crw_exponent, alpha, beta, location)
def metrics(radio_group_trajectory, metric_value, n_steps, x_pos, y_pos, speed, crw_exponent, alpha, beta, location):
    if metric_value=="MSD":
        return calculate_mean_sqrt_displacement()
    else:
        return path_length_calculation()

layout = interact(create_options, trajectory=radio_group_trajectory, throttled=True)
pn.Column(pn.Row(layout, trajectories, metrics))
