<a href="https://colab.research.google.com/github/filipchudzynski/stock-market-non-gaussianity-analyzer_v2/blob/main/toy_models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! git clone https://github.com/filipchudzynski/stock-market-non-gaussianity-analyzer_v2.git

Cloning into 'stock-market-non-gaussianity-analyzer_v2'...
remote: Enumerating objects: 55, done.[K
remote: Counting objects: 100% (55/55), done.[K
remote: Compressing objects: 100% (42/42), done.[K
remote: Total 55 (delta 16), reused 34 (delta 4), pack-reused 0 (from 0)[K
Receiving objects: 100% (55/55), 1.03 MiB | 4.64 MiB/s, done.
Resolving deltas: 100% (16/16), done.


In [2]:
import sys
import plotly.express as px
import numpy as np

sys.path.append("/content/stock-market-non-gaussianity-analyzer_v2/toy_models/")
sys.path.append("/content/stock-market-non-gaussianity-analyzer_v2/toy_models/testing_library")
from model1_white_noise import generate as generate_white_noise
from model2_brownian_motion import generate as generate_brownian_motion
from model3_trend_plus_noise import generate as generate_trend_plus_noise
from model4_regime_switching_variance import generate as generate_regime_switching_variance
from model5_lognormal_cascade import generate as generate_lognormal_cascade
from model6_multifractal_random_walk import generate as generate_multifractal_random_walk
from model7_coupled_cascades import generate as generate_coupled_cascades

from testing_library.detrending import moving_average
from testing_library.increments import increments
from testing_library.intermittency import lambda2
from intermittency_epjst_extension3 import lambda2_lognormal, mutual_information_knn
from testing_library.mi import mutual_information

In [None]:
detrender = moving_average.MovingAverageDetrender(10)
for trend_type in ["linear","quadratic","sinusoidal"]:
  trend_plus_noise = generate_trend_plus_noise(trend_type=trend_type)
  trend = []
  for i,_ in enumerate(trend_plus_noise):
    trend.append(detrender.detrend_point(trend_plus_noise,i,50))
  fig = px.scatter(y=[np.array(trend_plus_noise)+5,np.array(trend)+5,trend_plus_noise-trend])
  fig.update_traces(marker_size=2)
  fig.show()


# detrending and increments

In [14]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import numpy as np

# --- Noise generators ---
from model1_white_noise import generate as generate_white_noise
from model2_brownian_motion import generate as generate_brownian_motion
from model3_trend_plus_noise import generate as generate_trend_plus_noise
from model4_regime_switching_variance import generate as generate_regime_switching_variance
from model5_lognormal_cascade import generate as generate_lognormal_cascade
from model6_multifractal_random_walk import generate as generate_multifractal_random_walk

increments_s = 10
detrending_s = 10

# --- Your detrending tool ---
detrender = moving_average.MovingAverageDetrender(detrending_s)

# --- Collect all series ---
series = {
    "White Noise": generate_white_noise(),
    "Brownian Motion": generate_brownian_motion(),
    "Trend + Noise": generate_trend_plus_noise(),
    "Regime Switching Variance": generate_regime_switching_variance(),
    "Lognormal Cascade": generate_lognormal_cascade(),
    "Multifractal Random Walk": generate_multifractal_random_walk(),
}

# --- Compute increments ---
increments_dict = {
    name: increments.compute_increments(data, increments_s)
    for name, data in series.items()
}

# --- Compute detrended series (correctly subtracting trend) ---
detrended_dict = {}

for name, data in series.items():
    trend = []
    for i in range(len(data)):
        trend.append(detrender.detrend_point(data, i, detrending_s))  # this is the trend estimate
    trend = np.array(trend)
    detrended = data - trend
    detrended_dict[name] = detrended

# --- Build subplot titles in correct row-wise order ---
titles = []
for name in series.keys():
    titles.append(f"{name} (Raw)")
    titles.append(f"{name} (Increments)")
    titles.append(f"{name} (Detrended)")

# --- Create 3-column subplot ---
fig = make_subplots(
    rows=len(series),
    cols=3,
    shared_xaxes=False,
    subplot_titles=titles
)

# --- Add traces ---
for i, name in enumerate(series.keys(), start=1):

    # Column 1: raw
    fig.add_trace(
        go.Scatter(y=series[name], mode="lines", name=f"{name} Raw"),
        row=i, col=1
    )

    # Column 2: increments
    fig.add_trace(
        go.Scatter(y=increments_dict[name], mode="lines", name=f"{name} Increments"),
        row=i, col=2
    )

    # Column 3: detrended (correct)
    fig.add_trace(
        go.Scatter(y=detrended_dict[name], mode="lines", name=f"{name} Detrended"),
        row=i, col=3
    )

fig.update_layout(
    height=300 * len(series),
    width=1600,
    title_text=f"Noise Models: Raw • Increments(s={increments_s}) • Detrended(s={detrending_s})"
)

fig.show()


# lambda2

In [19]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import numpy as np
from scipy.stats import kurtosis


# --- Noise generators ---
from model1_white_noise import generate as generate_white_noise
from model2_brownian_motion import generate as generate_brownian_motion
from model3_trend_plus_noise import generate as generate_trend_plus_noise
from model4_regime_switching_variance import generate as generate_regime_switching_variance
from model5_lognormal_cascade import generate as generate_lognormal_cascade
from model6_multifractal_random_walk import generate as generate_multifractal_random_walk

# --- Collect all series ---
series = {
    "White Noise": generate_white_noise(),
    "Brownian Motion": generate_brownian_motion(),
    "Trend + Noise": generate_trend_plus_noise(),
    "Regime Switching Variance": generate_regime_switching_variance(),
    "Lognormal Cascade": generate_lognormal_cascade(),
    "Multifractal Random Walk": generate_multifractal_random_walk(),
}

# --- Compute increments + lambda2 ---
increments_dict = {}
lambda2_dict = {}
lambda2_lognormal_dict = {}

for name, data in series.items():
    inc = increments.compute_increments(data, 10)
    increments_dict[name] = inc
    lambda2_dict[name] = lambda2.estimate_lambda2(inc)
    lambda2_lognormal_dict[name] = lambda2_lognormal.estimate_lambda2_lognormal(inc)

# --- Build subplot titles (row-wise) ---
titles = []
for name in series.keys():
    lam = lambda2_dict[name]
    lam_log = lambda2_lognormal_dict[name]
    titles.append(f"{name} (Raw ")
    titles.append(f"{name} (Distribution, λ₂={lam:.3f} λ₂ logn={lam_log:.3f})")

# --- Create 2-column subplot ---
fig = make_subplots(
    rows=len(series),
    cols=2,
    shared_xaxes=False,
    subplot_titles=titles
)

# --- Add traces ---
for i, name in enumerate(series.keys(), start=1):

    # Column 1: raw signal
    fig.add_trace(
        go.Scatter(
            y=series[name],
            mode="lines",
            name=name
        ),
        row=i, col=1
    )

    # Column 2: histogram of increments
    fig.add_trace(
        go.Histogram(
            x=increments_dict[name],
            histnorm="probability density",
            opacity=0.7,
            name=name
        ),
        row=i, col=2
    )

fig.update_layout(
    height=300 * len(series),
    width=1400,
    title_text="Raw Signals and Increment Distributions with λ₂ Estimates",
    showlegend=False
)

fig.show()


In [27]:
import numpy as np
import plotly.express as px
from sklearn.metrics import mutual_info_score

def mutual_information_binned(x, y, bins=30):
    # remove NaNs
    mask = ~np.isnan(x) & ~np.isnan(y)
    x, y = x[mask], y[mask]

    # bin continuous values
    x_binned = np.digitize(x, np.histogram(x, bins=bins)[1])
    y_binned = np.digitize(y, np.histogram(y, bins=bins)[1])

    return mutual_info_score(x_binned, y_binned), [x_binned,y_binned]

# --- Generate data ---
cascade = generate_coupled_cascades()   # returns (x, y)
x1, y1 = cascade[0], cascade[1]

white1 = generate_white_noise()
white2 = generate_white_noise()


# --- Compute mutual information ---
mi_cascade = mutual_information.mutual_information(x1, y1)
mi_white = mutual_information.mutual_information(white1, white2)

print("Mutual Information (Coupled Cascades):", mi_cascade)
print("Mutual Information (White Noise):", mi_white)


mi_cascade_binned,binned_cascade_series = mutual_information_binned(x1, y1)
mi_white_binned,binned_white_series = mutual_information_binned(white1, white2)

print("MI (Coupled Cascades):", mi_cascade_binned)
print("MI (White Noise):", mi_white_binned)


# --- Plot cascades ---
fig1 = px.line(
    y=[x1, y1],
    labels={"value": "Signal", "index": "Time"},
    title=f"Coupled Cascades (MI = {mi_cascade:.4f})"
)
fig1.update_traces(line_width=1)
fig1.show()

# --- Plot white noises ---
fig2 = px.line(
    y=[white1, white2],
    labels={"value": "Signal", "index": "Time"},
    title=f"Independent White Noises (MI = {mi_white:.4f})"
)
fig2.update_traces(line_width=1)
fig2.show()

print("Binned")

# --- Plot cascades ---
fig3 = px.line(
    y=binned_cascade_series,
    labels={"value": "Signal", "index": "Time"},
    title=f"Coupled Cascades (MI = {mi_cascade_binned:.4f} binned)"
)
fig3.update_traces(line_width=1)
fig3.show()

# --- Plot white noises ---
fig4 = px.line(
    y=binned_white_series,
    labels={"value": "Signal", "index": "Time"},
    title=f"Independent White Noises (MI = {mi_white_binned:.4f} binned)"
)
fig4.update_traces(line_width=1)
fig4.show()


Mutual Information (Coupled Cascades): 8.317766166719341
Mutual Information (White Noise): 8.517193191416238
MI (Coupled Cascades): 0.838756361760313
MI (White Noise): 0.05537797418284584



Clustering metrics expects discrete values but received continuous values for label, and continuous values for target


Clustering metrics expects discrete values but received continuous values for label, and continuous values for target



Binned


In [None]:
 px.line(generate_white_noise())

In [None]:
px.line(generate_brownian_motion())

In [None]:
px.line(generate_trend_plus_noise())

In [None]:
px.line(generate_regime_switching_variance())

In [None]:
px.line(generate_multifractal_random_walk())

In [None]:
cascade = generate_coupled_cascades()
px.line(y=[cascade[0], cascade[1]])

In [None]:
cascade

(array([-0.72878188, -0.91572732,  0.18044591, ..., 29.83307048,
        28.68486131, 28.35225579]),
 array([ -0.22103911,   1.54025883,   1.10357456, ..., -23.01717776,
        -24.30714317, -23.18346361]))