# Random Walk Metrics


## Modules

In [98]:
import numpy as np
import pandas as pd
import math

from scipy.spatial import distance

from scipy.stats  import wrapcauchy
from scipy.stats import levy_stable

import plotly.graph_objects as go


## Class

In [99]:
################# http://www.pygame.org/wiki/2DVectorClass ##################
class Vec2d(object):
    """2d vector class, supports vector and scalar operators,
    and also provides a bunch of high level functions
    """
    __slots__ = ['x', 'y']

    def __init__(self, x_or_pair, y = None):
        if y == None:            
            self.x = x_or_pair[0]
            self.y = x_or_pair[1]
        else:
            self.x = x_or_pair
            self.y = y
            
    # Addition
    def __add__(self, other):
        if isinstance(other, Vec2d):
            return Vec2d(self.x + other.x, self.y + other.y)
        elif hasattr(other, "__getitem__"):
            return Vec2d(self.x + other[0], self.y + other[1])
        else:
            return Vec2d(self.x + other, self.y + other)

    # Subtraction
    def __sub__(self, other):
        if isinstance(other, Vec2d):
            return Vec2d(self.x - other.x, self.y - other.y)
        elif (hasattr(other, "__getitem__")):
            return Vec2d(self.x - other[0], self.y - other[1])
        else:
            return Vec2d(self.x - other, self.y - other)
    
    # Vector length
    def get_length(self):
        return math.sqrt(self.x**2 + self.y**2)
    
    # rotate vector
    def rotated(self, angle):        
        cos = math.cos(angle)
        sin = math.sin(angle)
        x = self.x*cos - self.y*sin
        y = self.x*sin + self.y*cos
        return Vec2d(x, y)

## Activity 1 - Path length - (BM1 vs BM2 vs CRW)
    - Write a function that returns a Brownian Motion (BM) trajectory in pandas df.
    - Write a function that returns a Correlated Random Walk (CRW) trajectory in pandas df.
    - Write a function that returns the path length for a given trajectory.
    - Compare at least the path length of three trajectories as shown in the figure below.
    - Display the results using plotly.

In [100]:
# Function to get Brownian Motion trajectory, using random 90 degree rotation angle directions
def bm_traj(speed=3, n_steps=1000, s_pos=[0,0]):
    trajectory_bw = pd.DataFrame(columns=['x_pos','y_pos'])
    temp_df = pd.DataFrame([{'x_pos':s_pos[0], 'y_pos':s_pos[1]}])

    trajectory_bw = pd.concat([trajectory_bw, temp_df], ignore_index=True)

    velocity = Vec2d(speed, 0) # Starting position for velocity vector

    for i in range(n_steps):
        turn_angle = np.random.choice([0, np.pi/2, np.pi, 3*np.pi/2])
        
        velocity = velocity.rotated(turn_angle)
        
        temp_df = pd.DataFrame([{'x_pos':trajectory_bw.x_pos[i] + velocity.x, 'y_pos':trajectory_bw.y_pos[i] + velocity.y}])
        trajectory_bw = pd.concat([trajectory_bw, temp_df], ignore_index=True)

    return trajectory_bw
    

In [101]:
# Function to get correlated random walk trajectory using Wrapped Cauchy Distribution for turning angles on each step/iteration
def crw_traj(speed=5, coefficient=0.4, n_steps=1000, s_pos=[0,0]):
    
    r = wrapcauchy.rvs(loc=0, c=coefficient, size=n_steps)

    velocity = Vec2d(speed, 0) # Velocity vector in starting position

    trajectory_crw = pd.DataFrame(columns=['x_pos', 'y_pos'])
    temp_df = pd.DataFrame([{'x_pos':s_pos[0],'y_pos':s_pos[1]}])

    trajectory_crw = pd.concat([trajectory_crw,temp_df], ignore_index=True) 

    for i in range(n_steps):
        turn_angle = r[i]
        
        velocity = velocity.rotated(turn_angle)
        
        temp_df = pd.DataFrame([{'x_pos':trajectory_crw.x_pos[i] + velocity.x, 'y_pos':trajectory_crw.y_pos[i] + velocity.y}])
        trajectory_crw = pd.concat([trajectory_crw,temp_df], ignore_index=True)
        
    return trajectory_crw
    

In [102]:
# Function to get the path length of a given dataframe trajectory
def pl_traj(df, s_pos=[0]):
    pl = pd.DataFrame(columns=['y_pos'])
    temp_df = pd.DataFrame([{'y_pos':s_pos[0]}])

    pl = pd.concat([pl, temp_df], ignore_index=True)

    for i in range(len(df)-1): # minus one because len counts the dataframe header
        temp_df = pd.DataFrame([{'y_pos':pl.y_pos[i] + distance.euclidean(df.iloc[i], df.iloc[i+1])}])
        pl = pd.concat([pl, temp_df], ignore_index=True)
        
    return pl


In [103]:
# Obtaining the correspondent trajectories
bm_tr_3 = bm_traj(3)
bm_tr_6 = bm_traj(6)
crw_tr_6 = crw_traj(6, coefficient=0.9)

# Obtaining path length dataframe for each trajectory
bm_pl_3 = pl_traj(bm_tr_3)
bm_pl_6 = pl_traj(bm_tr_6)
crw_pl_6 = pl_traj(crw_tr_6)

In [160]:
# Plotting of three path length trajectories for both Brownian Motion and Correlated Random Walks
fig = go.Figure()

fig.add_trace(go.Scatter(
    x= bm_pl_3.index,
    y= bm_pl_3.y_pos,
    name='BM - 3',
    mode='lines',
    showlegend=True
))

fig.add_trace(go.Scatter(
    x= bm_pl_6.index,
    y= bm_pl_6.y_pos,
    name='BM - 6',
    marker = dict(size=2),
    line = dict(width=6),
    mode='lines',
    showlegend=True
))

fig.add_trace(go.Scatter(
    x= crw_pl_6.index,
    y= crw_pl_6.y_pos,
    name='CRW 0.9 - 6',
    mode='lines',
    showlegend=True
))

fig.update_layout(
    title='Path Lengths',
    height=800,
    xaxis_title="times",
    yaxis_title="distnace",
    bargap=0.4
)

fig.show()
    

## Activity 2 Mean Squared Displacement - (BM vs CRW)
    - Write a function that returns the mean squared displacement for a given trajectory.
    - Compare the mean squared displacement curves of at least two trajectories of
    different kinds, as shown in the figure below.
    - Display the results using plotly.

In [87]:

def msd(df, s_pos=0):
    n = 1 # MSD Formula n initial value
    msd_df = pd.DataFrame(columns=['y_pos'])
    temp_df = pd.DataFrame([{'y_pos':s_pos}])
    msd_df = pd.concat([msd_df, temp_df], ignore_index=True)
    
    for n in range(1,len(df)-1):
        
        eucladean_distances = np.array([distance.euclidean(df.iloc[i-n],df.iloc[i]) for i in range(n, df.shape[0],1)])
        dist_squared = eucladean_distances**2
        msd = np.mean(dist_squared)
        
        
        temp_df = pd.DataFrame([{'y_pos':msd}])
        msd_df = pd.concat([msd_df, temp_df], ignore_index=True)
        
    return msd_df

bw_msd_df = msd(bm_tr_3)
crw_msd_df = msd(crw_tr_6)
fig = go.Figure()

fig.add_trace(go.Scatter(
    x= bw_msd_df.index,
    y= bw_msd_df.y_pos,
    name='BM - 3 - MSD',
    mode='lines',
    showlegend=True
))

fig.add_trace(go.Scatter(
    x= crw_msd_df.index,
    y= crw_msd_df.y_pos,
    name='CRW - 6 - MSD',
    mode='lines',
    showlegend=True
))


fig.update_layout(
    title='Mean Squared Distances',
    height=800,
    xaxis_title="times",
    yaxis_title="msd per position",
)

fig.show()

## Activity 3 - Turning-angle Distribution - (source dist. vs observed dist.)
    - Consider two CRW trajectories with different Cauchy coefficients.
    - Write a function that returns the turning angles for a given trajectory.
    - Compare the observed distribution (histogram) to the source distribution (curve) for
    both trajectories, as shown in the figure below.
    - Display the results using plotly. 

In [133]:
# Consider two CRW trajectories with different Cauchy coefficients.
crw_06 = crw_traj(speed=6, coefficient=0.6)
crw_09 = crw_traj(speed=6, coefficient=0.9)
# Write a function that returns the turning angles for a given trajectory.

def turning_angles(trajectory):
    turning_angles = []
    for i in range(1, len(trajectory)-1):
        # Calculate the turning angle
        a = trajectory.iloc[i-1]
        b = trajectory.iloc[i]
        c = trajectory.iloc[i+1]
        ab = b - a
        bc = c - b
        cross_product = np.cross(ab, bc)
        orient = np.sign(cross_product)
        if orient == 0:
            orient = 1
            
        denominator = np.linalg.norm(ab) * np.linalg.norm(bc) + np.finfo(float).eps #to avoid zeros
        
        angle = np.arcsin(np.linalg.norm(cross_product) / denominator)
        
        new_angle = angle * orient
        turning_angles.append(round(new_angle, 2))
        
    return pd.DataFrame(turning_angles, columns=['turning_angle'])
# Compare the observed distribution (histogram) to the source distribution (curve) for both trajectories.

# Calculate turning angles for both trajectories
turning_angles_1 = turning_angles(crw_06)
turning_angles_2 = turning_angles(crw_09)



x_values_angles_1 = np.linspace(min(turning_angles_1.turning_angle), max(turning_angles_1.turning_angle), 1000)
y_values_angles_1 = np.array([wrapcauchy.pdf(abs(i), 0.6) for i in x_values_angles_1])

scaling_factor = 100
y_values_angles_1 *= scaling_factor

x_values_angles_2 = np.linspace(min(turning_angles_2.turning_angle), max(turning_angles_2.turning_angle), 1000)
y_values_angles_2 = np.array([wrapcauchy.pdf(abs(i), 0.9) for i in x_values_angles_2])

scaling_factor = 100
y_values_angles_2 *= scaling_factor

# Plot histograms using Plotly
fig_hist = go.Figure()
fig_hist.add_trace(go.Histogram(
    y=turning_angles_1.index,
    x=turning_angles_1.turning_angle,
    name='CRW 0.6 observable',
    marker_color='blue'
))
fig_hist.add_trace(go.Histogram(
    y=turning_angles_2.index,
    x=turning_angles_2.turning_angle,
    name='CRW 0.9 observable',
    marker_color='red'
))
fig_hist.add_trace(go.Scatter(
    x = x_values_angles_1,
    y = y_values_angles_1,
    mode = 'lines',
    name='CRW - 0.9 PDF',
    showlegend=True
))

fig_hist.add_trace(go.Scatter(
    x = x_values_angles_2,
    y = y_values_angles_2,
    mode = 'lines',
    name='CRW - 0.9 PDF',
    showlegend=True
))
fig_hist.update_layout(title='Turning Angles Histogram')

fig_hist.show()

## Activity 4 - Step-length Distribution - (source dist. vs observed dist.)
    - Write a function that returns a Lévy Walk (LW) trajectory in pandas df.
    - Consider two LW trajectories with different alpha coefficients.
    - Write a function that restaurants the step lengths for a given trajectory.
    - Compare the observed distribution (histogram) to the source distribution (curve) for
    both trajectories, as shown in the figure below.
    - Display the results using plotly.

In [162]:


def levy_traj(alpha = 1.5, beta=0, m = 0, speed=10,s_pos = [0,0,0], n_steps=1000, time_per_step=0.0004):
    # Initialize the dataFrame to store x, y values to trace trajectory, then set initial position for Data Frame
    trajectory = pd.DataFrame(columns=['x_pos', 'y_pos', 'z_pos'])
    temp_df = pd.DataFrame([{'x_pos':s_pos[0], 'y_pos':s_pos[1], 'z_pos':s_pos[2]}]) 

    trajectory = pd.concat([trajectory,temp_df], ignore_index=True)

    # Initialize the velocity vector with initial position
    velocity = Vec2d(speed, 0)

    # Random values using Lévy algorithm
    r = levy_stable.rvs(alpha, beta, loc=m, size=n_steps) #angle rotation values

    # trajectory x and y values are defined by lévy distribution random values.
    # On each iteration the difference between each step is calculated multiplying the value returned by the velocity 
    # vector with the current value of the random lavue from lévy distribution.
    for i in range(n_steps):
        step_length = r[i]
        
        # Rotation angles for the vector are also random values from lévy distribution
        velocity = velocity.rotated(step_length)
        
        length_traveled = velocity.get_length() * step_length * time_per_step # times values (z)
        
        # new position values are stored in the trajectory dataframe
        temp_df = pd.DataFrame([{
            'x_pos':trajectory.x_pos[i] + velocity.x * step_length, 
            'y_pos':trajectory.y_pos[i] + velocity.y * step_length, 
            'z_pos':trajectory.z_pos[i] + length_traveled
            }]
        )
        trajectory = pd.concat([trajectory, temp_df], ignore_index=True)
        
    return trajectory, r

def step_lengths(lev_traj, rot_ang): #recieves levy walk, and the turning angles from levy distribution random values used on the given levy walk
    step_lengths = []
    step_length = 1
    for i in range(len(lev_traj)-1):
        if (rot_ang[i] < 10 and rot_ang[i] > -10 ): # Is the angle approaching to zero?
            step_length += 1 # If it does, sum
        else:
            step_lengths.append(step_length)
            step_length = 1

    return pd.DataFrame(step_lengths, columns=['step_length'])

# Levy walks trajectories with alpha values of 0.7 and 1            
levy_traj_1, rot_angles_1 = levy_traj(alpha=1, beta=1, m=5)
levy_traj_07, rot_angles_07 = levy_traj(alpha=0.7, beta=1, m=5)

# Step lengths for both 1 and 0.7 levy walks
levy_1_sls = step_lengths(levy_traj_1, rot_angles_1)
levy_07_sls = step_lengths(levy_traj_07, rot_angles_07)

# X values to show the levy walk
times = np.linspace(0,50, len(levy_1_sls))

# Probability density functions
levy_pdf_1 = levy_stable.pdf(times, alpha=1, beta=1,loc=3)
levy_pdf_07 = levy_stable.pdf(times, alpha=0.7, beta=1, loc=0)


scaling_factor = 150
levy_pdf_1 *= scaling_factor
levy_pdf_07 *= scaling_factor

# Plot histograms using Plotly
fig_hist = go.Figure()
fig_hist.add_trace(go.Histogram(
    y=times,
    x=levy_1_sls.step_length,
    name='Levy - Alpha 1.0 Beta 1.0 observable',
    marker_color='lightsalmon',
))

fig_hist.add_trace(go.Histogram(
    y=times,
    x=levy_07_sls.step_length,
    name='Levy - Alpha 0.7 Beta 1.0 observable',
    marker_color='skyblue'
))

fig_hist.add_trace(go.Scatter(
    x = times,
    y = levy_pdf_1,
    mode = 'lines',
    name='Levy Walk - Alpha 1.0 Beta 1.0 PDF',
    marker_color='darkred',
    showlegend=True
))

fig_hist.add_trace(go.Scatter(
    x = times,
    y = levy_pdf_07,
    mode = 'lines',
    name='Levy Walk - Alpha 0.7 Beta 1.0 PDF',
    marker_color='darkblue',
    showlegend=True
))

fig_hist.update_layout(
    title='Levy Walks Step Length distribution',
    bargap=0.4
    )

fig_hist.show()