In [114]:
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from itertools import repeat
import math

In [115]:
weights = [0.2, 0.8]

X, y = make_classification(
    n_samples=100000, 
    n_features=20, 
    n_informative=2,      
    weights=weights, 
    random_state=42,
    n_redundant=2)

num_samples = X.shape[0]

categorical_col1 = np.random.choice(['A', 'B', 'C'], size=num_samples)
categorical_col2 = np.random.choice(['X', 'Y', 'Z'], size=num_samples)

df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])

df_train, df_test = train_test_split(df, test_size=0.2, random_state=42)

In [116]:
X = np.asarray(df_train["feature_0"])

In [117]:
"""Convenção de hurst é que separar as amostras em pequenas amostras na qual o tamanho é proporcional à potência de dois"""

'Convenção de hurst é que separar as amostras em pequenas amostras na qual o tamanho é proporcional à potência de dois'

In [118]:
def hurst_v1(X):
    X = np.asarray(X)
    #rolling = np.diff(X)
    rolling = X
    size = len(X)
    exp = np.floor(math.log2(size)).astype(int)
    subsample_sizes = []
    for pow in range(1, exp + 1):
        subsample = 2 ** pow
        if size % subsample == 0:
            subsample_sizes.append(subsample)
    for subsample in subsample_sizes:
        window_indices = [np.arange(i, i + subsample) for i in range(0, size, subsample)]
        mean = np.mean(rolling[window_indices], axis=1)
        S = np.std(rolling[window_indices], axis=1, ddof=1)
        demeaned = rolling[window_indices] - mean[:, None]
        cumsum = np.cumsum(demeaned, axis=1)
        R = np.max(cumsum, axis=1) - np.min(cumsum, axis=1)
        r_s = R / S
    return r_s


In [121]:
import numpy as np
import math
from scipy.stats import linregress

def hurst_v2(X):
    X = np.asarray(X)
    rolling = np.diff(X)
    size = len(rolling)
    
    if size < 10:
        raise ValueError("Dados insuficientes para calcular o expoente de Hurst.")
    
    max_pow = int(np.floor(math.log2(size)))
    subsample_sizes = [2 ** pow for pow in range(1, max_pow + 1)]

    r_s = []
    valid_sizes = []
    
    for subsample in subsample_sizes:
        length_windows = size // subsample
        
        windows = rolling[:length_windows * subsample].reshape(length_windows, subsample)
        
        mean = np.mean(windows, axis=1, keepdims=True)
        S = np.std(windows, axis=1, ddof=1)
        demeaned = windows - mean
        cumsum = np.cumsum(demeaned, axis=1)
        R = np.max(cumsum, axis=1) - np.min(cumsum, axis=1)
        
        r_s.append(np.mean(R / S)) 
        valid_sizes.append(subsample)
    
    
    log_sizes = np.log(valid_sizes)
    log_r_s = np.log(r_s)
    slope, _, _, _, _ = linregress(log_sizes, log_r_s)
    
    return slope

In [122]:
hurst_v2(X)

np.float64(0.1586313770243084)