In [None]:
import random
import math
import numpy as np
import matplotlib.pyplot as plt
import pylab
import scipy
import pandas as pd
import seaborn as sns

import scienceplots

import statistics

import os.path
from IPython.display import clear_output

from arch.unitroot import PhillipsPerron, ADF

In [None]:
def backwards_sadf(time_series, r0, r2):
    min_window = int(len(time_series) * r0)
    ending_point = int(len(time_series) * r2) + 1
    
    stp_size = 5
    res = None
    res_win_size = None
    
    for win_size in range(min_window, ending_point, stp_size):
        adf_test = ADF(time_series[ending_point - win_size : ending_point])
        if res == None or res < adf_test.stat:
            res = adf_test.stat
            res_win_size = win_size
        
    return res, res_win_size
        
    
    
def gsadf(time_series, r0):
    stp = 0.01
    
    r2 = r0 + stp
    
    start = None
    
    results = []
    
    while r2 < 1:
        end_point = r2 * len(time_series)
        sadf, win_size = backwards_sadf(time_series, r0, r2)
        
        results.append((end_point - win_size, end_point, sadf))
                
        r2 += stp
        
    return results

In [None]:
def get_pf_deviation(price_hist, pf_hist):
    fundamental_dev = 0
    spec_score_max = 0
    
    for i in range(len(price_hist)):
        v = (price_hist[i] - pf_hist[i]) ** 2
        fundamental_dev += v
        
    fundamental_dev /= len(price_hist)
    
    return fundamental_dev ** 0.5


In [None]:
def hill_estimator(returns, proportion):
    # https://www.sciencedirect.com/science/article/pii/S0165188907000036#bib14
    
    returns = sorted(returns)
    
    n = len(returns)
    m = int(n * proportion)
    sig = 0
    for i in range(1, m + 1):
        sig += math.log(returns[n - i]) - math.log(returns[n - m - 1])
    sig /= m
    
    if sig == 0:
        return 0
    return 1 / sig
    

In [None]:
def boxplot_multiple(data, title=None, x='x', y='PPS', hue='Trader', x_rotation=0):
    df = pd.DataFrame(data)

    fig, ax = plt.subplots()

    ax.set_ylabel(y, fontsize=15, labelpad=3)
    ax.set_xlabel(x, fontsize=15, labelpad=4)
    
    width = 10
    height = 6
    fig.set_size_inches(width, height)

    sns.boxplot(ax=ax, x=x, y=y, hue=hue, data=df, palette="Set1", dodge=True, gap=.2, width=.5)
    
    if x_rotation != 0:
        plt.setp(ax.get_xticklabels(), rotation=x_rotation, ha="right", rotation_mode="anchor", fontsize = 14)
        
    ax.autoscale(tight=False)
    
    ax.legend(prop={'size': 13})
    
    
    ax.spines[['right', 'top']].set_visible(False)
    
    ax.get_xaxis().tick_bottom()
    ax.get_yaxis().tick_left()
    
    plt.xticks(fontsize=8, rotation=0)
    plt.yticks(fontsize=8, rotation=0)

In [None]:
def boxplot_two_axes(data, title=None, x='x', y='PPS', hue='Trader', x_rotation=0, titles = None):
    with plt.style.context(['science', 'ieee', 'no-latex']):
        fig = plt.figure(figsize=(10, 6))
        ax = fig.add_subplot(111)

        sns.set_context(rc = {'patch.linewidth': 1.0})
    
        data1 = {x: [], y:[], hue:[]}
        data2 = {x: [], y:[], hue:[]}

        y1 = None if titles == None else titles[0]
        y2 = None if titles == None else titles[1]

        for i in range(len(data[hue])):
            if data[hue][i] == data[hue][0]:
                y1 = data[hue][i] if y1 == None else y1

                data2[x].append(data[x][i])
                data2[y].append(None)
                data2[hue].append(data[hue][i])

                data1[x].append(data[x][i])
                data1[y].append(data[y][i])
                data1[hue].append(data[hue][i])

            else:
                y2 = data[hue][i] if y2 == None else y2

                data2[x].append(data[x][i])
                data2[y].append(data[y][i])
                data2[hue].append(data[hue][i])

                data1[x].append(None)
                data1[y].append(None)
                data1[hue].append(data[hue][i])

        ax.set_ylabel(y1, fontsize=20, labelpad=8)
        ax.set_xlabel(x, fontsize=20, labelpad=8)

        df = pd.DataFrame(data1)
        sns.violinplot(ax=ax, x=x, y=y, hue=hue, data=df, palette="Set1", dodge=True, gap=.2, width=.5, inner = None)

        if x_rotation != 0:
            plt.setp(ax.get_xticklabels(), rotation=x_rotation, ha="right", rotation_mode="anchor", fontsize = 14)

        ax.autoscale(tight=False)

        ax.legend(prop={'size': 19}, loc = (0.12, 0.81))

        ax.get_xaxis().tick_bottom()
        ax.get_yaxis().tick_left()

        plt.yticks(fontsize=12, rotation=0)
        plt.xticks(fontsize=13, rotation=0)
        
        ax2 = ax.twinx()
        df = pd.DataFrame(data2)
        
        sns.barplot(ax=ax2, estimator=np.mean, ci=85, x=x, y=y, hue=hue, data=df, palette="Set1", dodge=True, gap=.2, width=.5, edgecolor = "black")

        ax2.set_ylabel(y2, fontsize=20, labelpad=8)

        ax2.legend_ = None
        
        plt.yticks(fontsize=12, rotation=0)
        plt.show()
    
    

In [None]:
def barplot_multiple(data, title=None, x='x', y='PPS', hue='Trader', x_rotation=0):
    df = pd.DataFrame(data)

    with plt.style.context(['science', 'ieee', 'no-latex']):
        fig = plt.figure(figsize=(10, 6))
        ax = fig.add_subplot(111)
        
        sns.set_context(rc = {'patch.linewidth': 1.0})
        ax.set_ylabel(y, fontsize=20, labelpad=8)
        ax.set_xlabel(x, fontsize=20, labelpad=8)
    

        sns.barplot(ax=ax, x=x, y=y, hue=hue, data=df, palette="Set1", dodge=True, gap=.2, width=.5, edgecolor = "black")
    
        if x_rotation != 0:
            plt.setp(ax.get_xticklabels(), rotation=x_rotation, ha="right", rotation_mode="anchor", fontsize = 14)
    
        ax.autoscale(tight=False)
    
        ax.legend(prop={'size': 19})
    
        ax.get_xaxis().tick_bottom()
        ax.get_yaxis().tick_left()
    
        plt.yticks(fontsize=12, rotation=0)
        plt.xticks(fontsize=13, rotation=0)
        # ax.legend()
        plt.show()

In [None]:

PARAMETER_SET = 4

if PARAMETER_SET == 1:
    original_a1 = 0.3   #Importance individuals place on majority opinion  (alpha1)
    a2 = 0.2   #Importance of actual price trend  (alpha2)
    a3 = 0.5   #Pressure exerted by profit  (alpha3)

    v1 = 3   #Frequency of reevaluation of opinion
    v2 = 2   #Frequency of transition

    tc = 0.02   #Number of units that chartists buy or sell
    
    mu = 0.05

    gamma = 0.01    #Reaction strength (gamma)

    B = 6    #Speed of auctioneer (Beta)

    
    
elif PARAMETER_SET == 2:
    original_a1 = 0.9
    a2 = 0.25
    a3 = 1
    
    v1 = 4
    v2 = 1

    tc = 0.015  
    
    mu = 0.1

    gamma = 0.01    

    B = 4
    
elif PARAMETER_SET == 3:
    original_a1 = 0.75
    a2 = 0.25
    a3 = 0.75
    
    v1 = 0.5
    v2 = 0.5

    tc = 0.01
    
    mu = 0.1

    gamma = 0.02   

    B = 2
else:
    original_a1 = 0.8
    a2 = 0.2
    a3 = 1
    
    v1 = 2
    v2 = 0.6
    
    tc = 0.02
    
    mu = 0.05

    gamma = 0.01

    B = 4

    
a1 = 0
    
N = 500
r = 0.004    #Nominal dividends of the asset
R = 0.0004    #Average real returns from other investments 

s = 0.75    #Discount factor
sigma = 0.00   #Magnitude of fundamental value change


delta_t = 0.01   #Time interval 
plag = int((1 / delta_t) / 5)

In [None]:
from enum import Enum

class State(Enum):
    FUNDAMENTALIST = 1
    OPTIMIST = 2
    PESSIMIST = 3

In [None]:
class Agent:
    state = None
    parent = None
    
    counts = {State.FUNDAMENTALIST: 0, State.OPTIMIST: 0, State.PESSIMIST: 0}
    parent_community = None
    children = []
    
    def __init__(self, state, parent = None, has_children = False):
        
        self.set_state(state)
            
        self.parent_community = parent
        self.children = []
    
    
     
    # Only children count towards agent count calculations
    def set_state(self, state):
        if self.state != None and Agent.counts[self.state] <= 4:
            return
        
        Agent.counts[state] += 1
        if self.state != None:
            Agent.counts[self.state] -= 1 
            
        self.state = state
        
    
    def update_state(self, U1, U21, U22):
        
        
            
        total_community = self.parent_community.counts[State.OPTIMIST] + self.parent_community.counts[State.PESSIMIST]
        
        if total_community != 0:
            
            community_optimists = (self.parent_community.counts[State.OPTIMIST]) / total_community
            community_pessimists = (self.parent_community.counts[State.PESSIMIST]) / total_community
        
            
            U1 += b * (community_optimists - community_pessimists)
        
        ratio_optimists = Agent.counts[State.OPTIMIST] / N
        ratio_pessimists = Agent.counts[State.PESSIMIST] / N
        ratio_fundamentalists = Agent.counts[State.FUNDAMENTALIST] / N
        z = ratio_optimists + ratio_pessimists  #The fraction of agents that are chartists
        
        p_optimist = 0
        p_pessimist = 0
        p_fundamentalist = 0
        
        tie_breaker = random.random() <= 0.5
        
        
        if self.state == State.PESSIMIST:
            p_optimist = v1 * (z * math.exp(U1)) * delta_t if tie_breaker else 0
            p_fundamentalist = v2 * (ratio_fundamentalists * math.exp(-U22)) * delta_t if not tie_breaker else 0
            
        elif self.state == State.OPTIMIST:
            p_pessimist = v1 * (z * math.exp(-U1)) * delta_t if tie_breaker else 0
            p_fundamentalist = v2 * (ratio_fundamentalists * math.exp(-U21)) * delta_t if not tie_breaker else 0
            
            
        elif self.state == State.FUNDAMENTALIST:
            p_optimist = v2 * (ratio_optimists * math.exp(U21)) * delta_t if tie_breaker else 0
            p_pessimist = v2 * (ratio_pessimists * math.exp(U22)) * delta_t if not tie_breaker else 0
            
        
        prob = random.random()
        
        if prob <= p_fundamentalist:
            self.set_state(State.FUNDAMENTALIST)
        elif prob <= p_optimist:
            self.set_state(State.OPTIMIST)
        elif prob <= p_pessimist:
            self.set_state(State.PESSIMIST)
        
    def reset():
        for state in Agent.counts:
            Agent.counts[state] = 0

In [None]:
class Community:
    states = {State.FUNDAMENTALIST: 0, State.OPTIMIST: 0, State.PESSIMIST: 0}
    counts = {State.FUNDAMENTALIST: 0, State.OPTIMIST: 0, State.PESSIMIST: 0}
    
    children = []
    parent = None
    
    
    def __init__(self, parent):
        self.states = {State.FUNDAMENTALIST: 0, State.OPTIMIST: 0, State.PESSIMIST: 0}
        self.counts = {State.FUNDAMENTALIST: 0, State.OPTIMIST: 0, State.PESSIMIST: 0}
        self.children = []
        self.parent = parent
    
    
    def update_counts(self):
        
        self.counts = {State.FUNDAMENTALIST: 0, State.OPTIMIST: 0, State.PESSIMIST: 0}
        if self.parent != None:
            for s in self.counts:
                self.counts[s] = self.parent.counts[s] * info_efficiency + 0.5 * self.states[s] 
            
            
    def update_counts_corrupted(self):
        
        self.update_counts()
        self.counts[State.OPTIMIST] = corruption_S * (self.counts[State.OPTIMIST] + self.counts[State.PESSIMIST] + self.counts[State.FUNDAMENTALIST])
                
                
    def update_states(self, level):
        for s in self.states:
            self.states[s] = 0
            
        for child in self.children:
            if isinstance(child, Agent):
                self.states[child.state] += 1 / len(self.children)
            else:
                for s in self.states:
                    self.states[s] += child.states[s] / len(self.children)
                    
                    
        if level == 1 and self.states[State.OPTIMIST] > self.states[State.PESSIMIST]:
            self.states[State.OPTIMIST] *= (1 + echo_chamber)
            
        elif symmetric_echo and level == 1 and self.states[State.OPTIMIST] < self.states[State.PESSIMIST]:
            self.states[State.PESSIMIST] *= (1 + echo_chamber)
                    
    def update_states_corrupted(self, level):
        
        self.update_states(level)
        self.states[State.OPTIMIST] = corruption_S * (self.states[State.OPTIMIST] + self.states[State.PESSIMIST] + self.states[State.FUNDAMENTALIST])

In [None]:
percentage_chartists = 10

class Market:
    pf = 10   #Fundamental value
    
    agents = {}
    price = 0
    
    
    def __init__(self):
        for lvl in range(levels):
            self.agents[lvl] = []
            
        self.pf = 10
        self.price = self.pf
        Agent.reset()
        
        self.create_agents(levels - 1, None)
    
    def create_agents(self, level, parent):
        curr = None
        
        p = random.random()
        
        if level == 0:
            if p < percentage_chartists / 200:
                curr = Agent(State.OPTIMIST, parent, level != 0)
            elif p < percentage_chartists / 100:
                curr = Agent(State.PESSIMIST, parent, level != 0)
            else:
                curr = Agent(State.FUNDAMENTALIST, parent, level != 0)
                
        else:
            
            curr = Community(parent)
            for i in range(children_per_agent):
                child = self.create_agents(level - 1, curr)
                curr.children.append(child)
            
                
        self.agents[level].append(curr)
        return curr
    
    
    def update_agent_states(self, price_trend):
        n_chartists = Agent.counts[State.OPTIMIST] + Agent.counts[State.PESSIMIST]
        x = (Agent.counts[State.OPTIMIST] - Agent.counts[State.PESSIMIST]) / n_chartists  #The overall opinion of the chartists
        
        asset_portfolio_return = (r + price_trend / v2) / self.price
        
        
        U1 = (a1 * x) + (a2 * price_trend / v1)  #Decision factor (Between chartists)
        
        fundamentalist_profit = s * abs(self.price - self.pf) / self.price
        optimist_profit = asset_portfolio_return - R
        pessimist_profit = R - asset_portfolio_return
        
        U21 = a3 * (optimist_profit - fundamentalist_profit) #Decision factor (Between fundamentalists and optimists)
        U22 = a3 * (pessimist_profit - fundamentalist_profit) #Decision factor (Between fundamentalists and pessimists)
        
        for a in self.agents[0]:
            a.update_state(U1, U21, U22)
        
    
    def update_community_states(self, t):
        if b == 0:
            return
        
        # Backward Pass
        for lvl in range(1, levels):
            for i in range(len(self.agents[lvl])):
                a = self.agents[lvl][i]
                
                if lvl == corruption_L and i == 0 and t >= corruption_t0 and t <= corruption_t1:
                    a.update_states_corrupted(lvl)
                else:
                    a.update_states(lvl)
                
        
        # Forward_Pass
        for lvl in range(levels - 1, 0, -1):
            
            
            for i in range(len(self.agents[lvl])):
                
                a = self.agents[lvl][i]
                
                if lvl == corruption_L and i == 0 and t >= corruption_t0 and t <= corruption_t1:
                    a.update_counts_corrupted()
                else:
                    a.update_counts()
        
            
    
    def update_price(self):
        excess_demand_c = (Agent.counts[State.OPTIMIST] - Agent.counts[State.PESSIMIST]) * tc
        excess_demand_f = Agent.counts[State.FUNDAMENTALIST] * gamma * (self.pf - self.price)
        excess_demand = excess_demand_c + excess_demand_f
        
        noise = np.random.normal(0, mu)
        p_price_up = max(0, B * (excess_demand + noise))
        p_price_down = -min(0, B * (excess_demand + noise))
        
        price_change = 0
        
        prob = random.random()
        
        if prob <= p_price_up:
            price_change += 0.2 * delta_t
        
        if prob <= p_price_down:
            price_change -= 0.2 * delta_t
        
        self.price += price_change
        
    
    
    def run(self, n_iterations, prog_print = True):
        price_change = 0
        price_hist = []
        pf_hist = []
        
        entropies = []
        
        price_temp = [self.pf] * plag
        
        last_progress_print = 0
        progress_print_freq = 10
        
        count_hist = {State.FUNDAMENTALIST: [], State.OPTIMIST: [], State.PESSIMIST: []}
        for i in range(n_iterations):
            
            price_hist.append(self.price)
            pf_hist.append(self.pf)
            for state in Agent.counts:
                count_hist[state].append(Agent.counts[state] / N)
            
            
            random_pf_change = np.random.normal(0, sigma ** 2) # Update the fundamental price
            self.pf += random_pf_change
            
            
            if prog_print and int(100 * i / n_iterations) - last_progress_print >= progress_print_freq:
                print('Progress: ' + str(int(100 * i / n_iterations)) + '%')
                last_progress_print = int(100 * i / n_iterations)
                
            for j in range(int(1 / delta_t)):
                self.update_price()
                
                price_temp.pop(0)
                price_temp.append(self.price)
                
                self.update_agent_states((self.price - price_temp[0]) / (plag * delta_t))
                self.update_community_states(i)
                
        
        return price_hist, count_hist, pf_hist, entropies

In [None]:
levels = 5
children_per_agent = 5

N = children_per_agent ** (levels - 1)
total_N = int((children_per_agent ** (levels) - 1) / (children_per_agent - 1))

mult = 3
b = 0
a1 = 0

print(N)

In [None]:
info_efficiency = 0.5

In [None]:
symmetric_echo = False
echo_chamber = 0

In [None]:
corruption_t0 = 10000
corruption_t1 = 10000
corruption_S = 1000
corruption_L = None

In [None]:
m = Market()
price_hist, count_hist, pf_hist, entropy = m.run(1600)

print("Volatility: " + str(statistics.stdev(price_hist)))
print("Fundamental Deviation: " + str(get_pf_deviation(price_hist, pf_hist)))

In [None]:
with plt.style.context(['science', 'ieee', 'no-latex']):
    fig, ax = plt.subplots()
    
    width = 10
    height = 6
    fig.set_size_inches(width, height)

    ax.set_ylabel("Price", fontsize=20, labelpad=7)
    ax.set_xlabel("Time", fontsize=20, labelpad=8)
    
    ax.plot(price_hist, label = "Asset Price")
    ax.plot(pf_hist, label = "Fundamental Price")
    ax.autoscale(tight=False)
    ax.legend(prop={'size': 19})
    
    ax.get_xaxis().tick_bottom()
    ax.get_yaxis().tick_left()
    
    plt.xticks(fontsize=12, rotation=0)
    plt.yticks(fontsize=13, rotation=0)
    
    plt.ylim(9.85, 10.6)
    plt.show()
    

In [None]:
data = {"Time": [], "Trader Proportion": [], "Type":[]}

for i in range(0, len(count_hist[State.OPTIMIST]), 5):
    data["Time"].append(i)
    data["Trader Proportion"].append(count_hist[State.OPTIMIST][i])
    data["Type"].append("Optimists")
    
    data["Time"].append(i)
    data["Trader Proportion"].append(count_hist[State.PESSIMIST][i])
    data["Type"].append("Pessimists")
    
df = pd.DataFrame(data)

with plt.style.context(['science', 'ieee', 'no-latex']):
    fig, ax = plt.subplots()
    
    width = 10
    height = 6
    fig.set_size_inches(width, height)

    ax.set_ylabel("Trader Proportion", fontsize=20, labelpad=7)
    ax.set_xlabel("Time", fontsize=20, labelpad=8)
    
    sns.lineplot(ax=ax, data=df, x="Time", y="Trader Proportion", hue = "Type", palette = 'Set1')
    
    ax.get_xaxis().tick_bottom()
    ax.get_yaxis().tick_left()
    
    plt.xticks(fontsize=12, rotation=0)
    plt.yticks(fontsize=13, rotation=0)
    
    
    ax.legend(prop={'size': 19})
    
    ax.autoscale(tight=False)
    plt.show()
    

In [None]:
with plt.style.context(['science', 'ieee', 'no-latex']):
    fig, ax = plt.subplots()
    
    width = 10
    height = 6
    fig.set_size_inches(width, height)

    ax.set_ylabel("Price", fontsize=15, labelpad=3)
    ax.set_xlabel("Time", fontsize=15, labelpad=4)
    
    ax.plot(price_hist[:800], label = "Asset Price")
    ax.plot(pf_hist[:800], label = "Fundamental Price")
    
    ax.axvspan(corruption_t0, corruption_t1, color='red', alpha=0.07, label = 'Community Corrupted')
    
    ax.autoscale(tight=False)
    
    ax.legend(prop={'size': 13})
    
    
    ax.spines[['right', 'top']].set_visible(False)
    
    ax.get_xaxis().tick_bottom()
    ax.get_yaxis().tick_left()
    
    plt.xticks(fontsize=8, rotation=0)
    plt.yticks(fontsize=8, rotation=0)
    # ax.legend()
    plt.show()

In [None]:
def get_rolling_volatility(price_hist, win_size = 50):
    vol = []
    for i in range(2, len(price_hist)):
        vol.append(statistics.stdev(price_hist[max(0, i - win_size) : i]))
        
    return vol

vol = get_rolling_volatility(price_hist)

In [None]:
with plt.style.context(['science', 'ieee', 'no-latex']):
    fig, ax = plt.subplots()
    
    width = 10
    height = 6
    fig.set_size_inches(width, height)

    ax.set_ylabel("Asset Price")
    ax.set_xlabel("Time")
    
    
    ax.plot(vol, label = "Volatility")
    ax.autoscale(tight=False)
    
    ax.legend()
    plt.show()

In [None]:
def get_speculative_hist(count_hist):
    res = []
    for i in range(len(count_hist[State.OPTIMIST])):
        res.append(abs(count_hist[State.OPTIMIST][i] - count_hist[State.PESSIMIST][i]))
        
    return res

speculative_hist = get_speculative_hist(count_hist)
spec_score = sum(speculative_hist) / (sum(count_hist[State.OPTIMIST]) + sum(count_hist[State.PESSIMIST]))
print(spec_score)

In [None]:
bubbles = get_bubbles_pwy(price_hist)

In [None]:
with plt.style.context(['science', 'ieee', 'no-latex']):
    fig, ax = plt.subplots()
    
    width = 10
    height = 6
    fig.set_size_inches(width, height)

    ax.set_ylabel("Price", fontsize=22, labelpad=5)
    ax.set_xlabel("Time", fontsize=22, labelpad=8)
    
    ax.plot(price_hist)
    
    for i in range(len(bubbles)):
        bubble = bubbles[i]
        
        if i == 0:
            ax.axvspan(bubble[0], bubble[1], color='gold', alpha=0.07, label='Significant Bubble Period(s)')
        else:
            ax.axvspan(bubble[0], bubble[1], color='gold', alpha=0.07)
    
    ax.autoscale(tight=False)
    
    ax.legend(prop={'size': 19})
    
    plt.xticks(fontsize=15, rotation=0)
    plt.yticks(fontsize=15, rotation=0)
    
    plt.show()

In [None]:
with plt.style.context(['science', 'ieee', 'no-latex']):
    fig, ax = plt.subplots()
    
    width = 10
    height = 6
    fig.set_size_inches(width, height)

    ax.set_ylabel("Price", fontsize=15, labelpad=3)
    ax.set_xlabel("Time", fontsize=15, labelpad=4)
    
    ax.plot(price_hist, label = "Asset Price")
    ax.plot(pf_hist, label = "Fundamental Price")
    
    for i in range(len(bubbles)):
        bubble = bubbles[i]
        
        if i == 0:
            ax.axvspan(bubble[0], bubble[1], color='gold', alpha=0.07, label='Significant Bubble Period(s)')
        else:
            ax.axvspan(bubble[0], bubble[1], color='gold', alpha=0.07)
    
    ax.autoscale(tight=False)
    
    ax.legend(prop={'size': 13})
    
    
    ax.spines[['right', 'top']].set_visible(False)
    
    ax.get_xaxis().tick_bottom()
    ax.get_yaxis().tick_left()
    
    plt.xticks(fontsize=8, rotation=0)
    plt.yticks(fontsize=8, rotation=0)
    plt.show()

In [None]:
plt.plot(speculative_hist)
plt.show()

In [None]:
def get_time_under_pf(price_hist, pf_hist):
    res = 0
    
    for i in range(len(price_hist)):
        if price_hist[i] <= pf_hist[i]:
            res += 1
            
    return res / len(price_hist)

### Fat-tails of asset returns

#### Expected features:
- Large Kurtosis statistic (~20)
- Returns approximate Gaussian for bigger time intervals
- Power law of returns: The probability for an absolute return greater than a certain number follows the power law distribution with an exponent of about 3 

Heavy tails: “The (unconditional) distribution of returns seems to display a power-law or
Pareto-like tail, with a tail index which is finite, higher than two and less than five for most
data sets studied. In particular this excludes stable laws with infinite variance and the normal
distribution. However the precise form of the tails is difficult to determine.”

Aggregational Gaussianity: “As one increases the timescale ∆t over which returns are calculated, their distribution looks more and more like the normal distribution. In particular, the
shape of the distribution is not the same at different timescales.”

In [None]:
def get_returns(price_hist, T):
    returns = [] # Log changes of market price

    for i in range(T, len(price_hist)):
        returns.append(price_hist[i] - price_hist[i - T])
    
    return returns

def get_abs_returns(price_hist, T):
    returns = [] # Log changes of market price

    for i in range(T, len(price_hist)):
        returns.append(abs(price_hist[i] - price_hist[i - T]))
    
    return returns


def get_log_returns(price_hist, T):
    returns = [] # Log changes of market price

    for i in range(T, len(price_hist)):
        returns.append((math.log(price_hist[i]) - math.log(price_hist[i - T])))
    
    return returns


def calculate_cumulative_return_p(price_hist, T):
    returns = get_abs_returns(price_hist, T) # Log changes of market price

    returns.sort()
    cumulative = {}
    for i in range(len(returns)):
        cumulative[returns[i]] = 1 - i / len(returns)
        
    return list(cumulative.keys()), list(cumulative.values())

def calculate_cumulative_return_dist(price_hist, T):
    returns = get_abs_returns(price_hist, T) # Log changes of market price

    returns.sort()
    cumulative = {}
    for i in range(len(returns)):
        cumulative[returns[i]] = i / len(returns)
        
    return list(cumulative.keys()), list(cumulative.values())

In [None]:
from scipy.interpolate import UnivariateSpline

def get_return_distribution(tau, bins = 30, density = False):
    returns_aggregate = get_log_returns(price_hist, tau) # Log changes of market price (for tau days)
    
    bins = bins
    p,x = np.histogram(returns_aggregate, range=[-0.025, 0.025], bins = bins, density = density)
    p = p.tolist()
    x = x.tolist()
    
    return x[:-1], p

In [None]:
import matplotlib.mlab as mlab
from scipy.stats import norm

from statistics import NormalDist


x1, p1 = get_return_distribution(1, 50, True)
x2, p2 = get_return_distribution(50, 50, True)


with plt.style.context(['science', 'high-vis', 'no-latex']):
    fig, ax = plt.subplots()
    width = 8
    height = 6
    fig.set_size_inches(width, height)
    
    ax.set_ylabel("Frequency Density")
    ax.set_xlabel("Log Returns")
    
    ax.scatter(x1, p1, marker = '+', label = r'$\tau$' + ' = 1')
    ax.scatter(x2, p2, marker = '2', label = r'$\tau$' + ' = 20')
    
    ax.autoscale(tight=False)
    ax.legend()
    plt.show()

In [None]:
scipy.stats.kurtosistest(get_log_returns(price_hist, 1))

In [None]:
import scipy.stats as stats

h = np.asarray(get_log_returns(price_hist, 50))
h = sorted(h)
#use the scipy stats module to fit a normal distirbution with same mean and standard deviation
fit = stats.norm.pdf(h, np.mean(h), np.std(h)) 
#plot both series on the histogram
plt.plot(h,fit,'-',linewidth = 2)
plt.hist(h, density = True, bins = 40)  
plt.show() 

df = pd.DataFrame({'col':h})
print(df.kurt())
print(df.skew())

In [None]:
def get_kurtosis(price_hist, lag):
    
    h = np.asarray(get_log_returns(price_hist, lag))

    h = sorted(h)
    fit = stats.norm.pdf(h, np.mean(h), np.std(h)) 

    df = pd.DataFrame({'col':h})
    return df.kurt()

In [None]:
from scipy.optimize import curve_fit
offset = 0.005

def cubic_pow_law(x, k):
    y = k * (x + offset)**(-3)
    return y

def pow_law(x, A, k):
    y = k * (x + offset)**(-A)
    return y

def get_pareto_exponent(price_hist, pct = 0.1):
    x, y = calculate_cumulative_return_dist(price_hist, 1)
    x = np.asarray(list(x))
    y = np.asarray(list(y))
    
    return hill_estimator(get_returns(price_hist, 1), pct)

In [None]:
ra, rb = calculate_cumulative_return_p(price_hist, 1)
plt.plot(list(ra), list(rb))
plt.show()

plt.show()

parameters = get_pareto_exponent(price_hist)
print("Best fit exponent: " + str(parameters))

In [None]:
x1 = np.linspace(0, 0.005, 100)
y1 = parameters[1] * (x1 + offset) ** (-parameters[0])

with plt.style.context(['science', 'ieee', 'no-latex']):
    fig, ax = plt.subplots()
    
    width = 5
    height = 3
    fig.set_size_inches(width, height)

    ax.set_ylabel("Frequency")
    ax.set_xlabel("Absolute (Log) Returns")
    
    ax.plot(x1, y1, label = '$a(x + ' + str(offset) + r')^{-k}$')
    ax.plot(x, y, label = 'Absolute (log) returns')
    ax.autoscale(tight=False)
    
    ax.legend(prop={'size': 8})
    fig.tight_layout()
    plt.show()
    

### Volatility Clustering


#### Expected features:
- Absence of linear autocorrelations
- Absolute return and square return of the stock price show autocorrelation
- Power law of volatility: autocorrelation function of absolute returns decreases slowly as a power law distribution with exponent approximately ranging from 0.2 to 0.4 

Volatility clustering: “Different measures of volatility display a positive autocorrelation over
several days, which quantifies the fact that high-volatility events tend to cluster in time.”

Slow decay of autocorrelation in absolute returns: “The autocorrelation function of absolute returns decays slowly as a function of the time lag, roughly as a power law with an exponent
β ∈ [0.2, 0.4]. This is sometimes interpreted as a sign of long-range dependence.”

In [None]:
import statsmodels.api as sm

def calculate_autocorr(ps):
    returns_aggregate_abs = []
    returns_aggregate_sq = []
    tau = 20
    for i in range(0, len(ps) - tau):
        returns_aggregate_sq.append((math.log(ps[i + tau]) - math.log(ps[i])) ** 2)
        returns_aggregate_abs.append(abs(math.log(ps[i + tau]) - math.log(ps[i])))
        
    return sm.tsa.acf(returns_aggregate_abs, nlags = 75)[1:], sm.tsa.acf(returns_aggregate_sq, nlags = 75)[1:]

autocorr_abs, autocorr_sq = calculate_autocorr(price_hist)

with plt.style.context(['science', 'ieee', 'no-latex']):
    fig, ax = plt.subplots()
    
    width = 5
    height = 3
    fig.set_size_inches(width, height)

    ax.set_ylabel("Autocorrelation")
    ax.set_xlabel("Lag")
    
    ax.plot(autocorr_abs, label = 'Absolute Log Returns')
    ax.plot(autocorr_sq, label = 'Squared Returns')
    
    ax.autoscale(tight=False)
    
    ax.legend()
    plt.show()


In [None]:
x = np.asarray(range(1, len(autocorr_abs) + 1))
y = np.asarray(list(autocorr_abs))

def pow_law(x, A, B):
    y = B * (x + 0.01)**(A)
    return y

parameters, _ = curve_fit(pow_law, x, y, maxfev=5000)
print("Exponent of decay = " + str(abs(parameters[0])))

In [None]:
def gsadf_test(time_series):
    L = len(time_series)
    
    if L == 100:
        r0 = 0.19
            
    elif L == 200:
        r0 = 0.137
            
    elif L == 400:
        r0 = 0.1
            
    elif L == 800:
        r0 = 0.074
    
    elif L == 1600:
        r0 = 0.055
            
    else:
        print("Invalid Length")
        return
    
    stp = 0.01
    r2 = r0 + stp
    start = None
    
    cv = 1
    
    res = None
    
    while r2 < 1:
        
        sadf, _ = backwards_sadf(time_series, r0, r2)
        
        if res == None or res < sadf:
            res = sadf
            
        r2 += stp
        
    return res


time_series_data = price_hist  


In [None]:
def get_bubbles_pwy(time_series, significance_lvl = 0.9):
    critical_val = None
    
    L = len(time_series)
    min_bubble_size = math.log(L) / L
    if L == 100:
        r0 = 0.19
        if significance_lvl == 0.9:
            critical_val = 1.1
        elif significance_lvl == 0.95:
            critical_val = 1.37
        elif significance_lvl == 0.99:
            critical_val = 1.88
            
    elif L == 200:
        r0 = 0.137
        if significance_lvl == 0.9:
            critical_val = 1.12
        elif significance_lvl == 0.95:
            critical_val = 1.41
        elif significance_lvl == 0.99:
            critical_val = 2.03
            
    elif L == 400:
        r0 = 0.1
        if significance_lvl == 0.9:
            critical_val = 1.19
        elif significance_lvl == 0.95:
            critical_val = 1.49
        elif significance_lvl == 0.99:
            critical_val = 2.05
            
    elif L == 800:
        r0 = 0.074
        if significance_lvl == 0.9:
            critical_val = 1.21
        elif significance_lvl == 0.95:
            critical_val = 1.51
        elif significance_lvl == 0.99:
            critical_val = 2.06
    
    elif L == 1600:
        r0 = 0.055
        if significance_lvl == 0.9:
            critical_val = 1.28 
        elif significance_lvl == 0.95:
            critical_val = 1.57 
        elif significance_lvl == 0.99:
            critical_val = 2.22
            
    else:
        print("Invalid Length")
        return
    
    stp = 0.01
    
    r2 = r0 + stp
    
    start = None
    
    results = []
    
    while r2 < 1:
        sadf, _ = backwards_sadf(time_series, r0, r2)
        
        if sadf >= critical_val:
            start = r2
            r2 += min_bubble_size
            while sadf >= critical_val and r2 < 1:
                sadf, _ = backwards_sadf(time_series, r0, r2)
                
                r2 += stp
                
            results.append(((start) * L, min(L, (r2)* L)))
                
        r2 += stp 
        
    return results

In [None]:
with plt.style.context(['science', 'ieee', 'no-latex']):
    fig, ax = plt.subplots()
    
    width = 10
    height = 6
    fig.set_size_inches(width, height)

    ax.set_ylabel("Asset Price")
    ax.set_xlabel("Time")
    
    ax.plot(price_hist)
    
    ax.axvspan(most_significant_result[0], most_significant_result[1], color='gold', alpha=0.1, label='Most significant bubble period')
    
    ax.autoscale(tight=False)
    
    # ax.legend()
    plt.show()


In [None]:
def plot_sadf(time_series, r0):
    r2 = r0
    stp = 0.01
    start = None
    
    cv = 1
    
    results = []
    
    while r2 < 1:
        
        sadf = backwards_sadf(time_series, r0, r2)
        results.append(sadf[0])
                
        r2 += stp
        
    return results

sadf_plot = plot_sadf(price_hist, 0.05)

In [None]:
plt.plot(sadf_plot)
plt.show()

plt.plot(price_hist)
plt.show()

In [None]:
sadf_plot = plot_sadf(price_hist, 0.1)

plt.plot(sadf_plot)
plt.show()

plt.plot(price_hist)
plt.show()