# Sliding-Window Hurst Exponent

--- 
## Example: **32-point series** (`x1 … x32`) shown with **rolling (overlapping) windows** 

# Rolling windows (overlapping)

**n = 8, stride = 1** → 25 overlapping windows:

```
[ x1 …  x8 ]          (win 1)
   [ x2 …  x9 ]       (win 2)
      [ x3 … x10 ]    (win 3)
          ...
                    [ x25 … x32 ]  (win 25)
```

**n = 16, stride = 1** → 17 overlapping windows:

```
[ x1 … x16 ]          (win 1)
   [ x2 … x17 ]       (win 2)
      [ x3 … x18 ]    (win 3)
          ...
                  [ x17 … x32 ]  (win 17)
```

Use case: **local/rolling H** (per time index). You compute R/S for each overlapping window and (optionally) average or keep the whole rolling curve. This **does not** directly give the canonical H slope unless you **aggregate** per n (e.g., average all rolling R/S at n=8, at n=16, …) and then regress `log(avg R/S)` vs `log(n)`.

 **Rolling (overlap)** = many local R/S values per n (good for time-varying H).
 * If you want canonical H from rolling windows: **average the rolling R/S at each n first**, then do the log–log slope.


In [2]:
import numpy as np

def hurst_rs(x: np.ndarray) -> float:
    x = np.asarray(x, dtype=float).ravel()
    N = x.size
    if N < 2:
        return np.nan
    Y = np.cumsum(x - x.mean())
    R = Y.max() - Y.min()
    S = x.std(ddof=1)
    if S <= 0:
        return np.nan
    return np.log(R / S) / np.log(N)

def rolling_hurst(x, window: int) -> np.ndarray:
    x = np.asarray(x, dtype=float).ravel()
    n = x.size
    out = np.full(n, np.nan, dtype=float)
    if window < 2 or n < window:
        return out
    for i in range(window, n + 1):
        out[i-1] = hurst_rs(x[i-window:i])
    return out

def mean_hurst(x, window: int) -> float:
    rh = rolling_hurst(x, window)
    return float(np.nanmean(rh))

if __name__ == "__main__":
    rng = np.random.default_rng(0)
    steps = rng.standard_normal(4096)   # white noise → ~0.5
    walk  = steps.cumsum()              # random walk → ~1.0

    w = 20
    rh_steps = rolling_hurst(steps, w)
    rh_walk  = rolling_hurst(walk,  w)

    print("Rolling H (white noise): last, mean =", rh_steps[-1], mean_hurst(steps, w))
    print("Rolling H (random walk): last, mean =", rh_walk[-1],  mean_hurst(walk,  w))


Rolling H (white noise): last, mean = 0.5392136978652915 0.4949990155840413
Rolling H (random walk): last, mean = 0.6678687125828524 0.6666094817936171


# Canonical Multi-Window Slope Hurst Exponent

*  for each window size n:
*  split series into non-overlapping chunks of length n
*  for each chunk: compute (R/S) = (range of cumulative deviations) / (std of chunk)
*  take average R/S over chunks at size n
*  fit slope of log(avg R/S) vs log(n) → that slope = Hurst exponent

---

*  You don’t just pick one n (like 20). You try many n values (say 8, 16, 32, … up to N/2).
*  For each n, you get one average 𝑅/𝑆 value.
*  Collect all pairs (log(n), log(E[R/S])).
*  Fit a line through them. The slope of that line is H.

## Example:

Imagine you have a series of 32 points, labeled `x1 … x32`.

---

**n = 8 (window size 8):**
We chop into `32/8 = 4 chunks` of 8 each.

```
[ x1  …  x8 ]   [ x9  …  x16 ]   [ x17 …  x24 ]   [ x25 …  x32 ]
   chunk 1         chunk 2           chunk 3           chunk 4
```

---

**n = 16 (window size 16):**
Now we chop into `32/16 = 2 chunks` of 16 each.

```
[ x1  …  x16 ]   [ x17 …  x32 ]
     chunk 1         chunk 2
```

---

**n = 32 (window size 32):**
Now the whole series is a single chunk.

```
[ x1 … x32 ]
   chunk 1
```

---

At each scale `n`:

1. Within each chunk, compute **R/S**.
2. Average across all chunks at that scale.
3. Record `(log(n), log(avg R/S))`.

Finally, fit a straight line through all these points → **slope = Hurst exponent**.

---

* **Canonical (non-overlap)** = one average R/S per n → **regress slope** → H.
* Notice: chunks at different scales **reuse the same data** but grouped differently. That’s why it’s called multi-scale — not overlapping sliding windows, but re-chunking the whole series at bigger and bigger `n`.


In [3]:
from typing import Iterable, Tuple, List
import numpy as np

def hurst_rs(
        x: Iterable[float], 
        min_window: int = 8, 
        max_window: int | None = None
        ) -> Tuple[float, np.ndarray, np.ndarray]:
    
    x = np.asarray(x, dtype=float).ravel()
    N = x.size
    if max_window is None:
        max_window = max(min_window, N // 2)

    windows: List[int] = []
    w = max(1, min_window)
    while w <= max_window:
        windows.append(w)
        w *= 2
    windows = np.array(windows, dtype=int)

    avg_rs: List[float] = []
    for n in windows:
        k = N // n
        if k < 2:
            avg_rs.append(np.nan)
            continue
        x_trim = x[:k * n]
        chunks = x_trim.reshape(k, n)
        means = chunks.mean(axis=1, keepdims=True)
        demeaned = chunks - means
        cum = demeaned.cumsum(axis=1)
        R = cum.max(axis=1) - cum.min(axis=1)
        S = chunks.std(axis=1, ddof=1)
        mask = S > 0
        RS = (R[mask] / S[mask]) if np.any(mask) else np.array([], dtype=float)
        avg_rs.append(np.nan if RS.size == 0 else RS.mean())

    avg_rs = np.array(avg_rs, dtype=float)
    ok = np.isfinite(avg_rs) & (avg_rs > 0)
    win_ok = windows[ok]
    rs_ok = avg_rs[ok]
    if win_ok.size < 2:
        return float("nan"), windows, avg_rs

    log_n = np.log(win_ok.astype(float))
    log_rs = np.log(rs_ok.astype(float))
    slope, intercept = np.polyfit(log_n, log_rs, 1)
    return float(slope), windows, avg_rs

if __name__ == "__main__":
    rng = np.random.default_rng(0)
    steps = rng.standard_normal(4096)
    walk  = steps.cumsum()
    print("White noise H:", hurst_rs(steps, min_window=8)[0])
    print("Random walk H:", hurst_rs(walk,  min_window=8)[0])


White noise H: 0.5856402798514561
Random walk H: 1.0110888424383455


In [4]:
# Hurst Exponent via classic R/S (rescaled-range) analysis
# Every line is explained simply, like teaching a kid.

from typing import Iterable, Tuple, List
import numpy as np

def hurst_rs(
    x: Iterable[float],          # the time series (a list or array of numbers)
    min_window: int = 8,         # smallest chunk length we will test
    max_window: int | None = None,  # largest chunk length; if None we set it later
) -> Tuple[float, np.ndarray, np.ndarray]:
    # Turn the data into a 1D numpy array (easy to do math with)
    # ravel() in NumPy takes an array of any shape and flattens it into a 1-D view (no copies if possible).
    # like reshape(-1): returns all the elements laid out in a single row vector.
    x = np.asarray(x, dtype=float).ravel()
    # How many points we have
    N = x.size
    # If no max_window given, use half the data length (so we can make at least 2 chunks)
    if max_window is None:
        max_window = max(min_window, N // 2)

    # Make a list of window sizes to try. We use powers of two: 8,16,32,...
    # (Powers of two spread nicely and keep things simple.)
    windows: List[int] = []
    w = max(1, min_window)
    # Jump forward by doubling until we pass max_window
    while w <= max_window:
        windows.append(w)
        w *= 2
    windows = np.array(windows, dtype=int)

    # We will store the average R/S value for each window size here
    avg_rs: List[float] = []

    # Go through each window size (chunk length)
    for n in windows:
        # How many full chunks of length n fit inside the data?
        k = N // n
        # If we cannot make at least 2 chunks, skip (we want to average across chunks)
        if k < 2:
            avg_rs.append(np.nan)
            continue

        # Keep only the part that fits perfectly into k chunks
        x_trim = x[:k * n]
        # Reshape into a matrix with k rows (chunks) and n columns (points per chunk)
        chunks = x_trim.reshape(k, n)

        # For each chunk, compute the chunk mean (average value)
        means = chunks.mean(axis=1, keepdims=True)
        # Subtract the mean so each chunk is "mean-zero" (we look at ups/downs around average)
        demeaned = chunks - means
        # Cumulative sum inside each chunk: walk forward adding the ups/downs
        cum = demeaned.cumsum(axis=1)
        # Range R in each chunk: highest point minus lowest point of the cumulative walk
        R = cum.max(axis=1) - cum.min(axis=1)
        # Standard deviation S in each chunk (how wiggly the original chunk is)
        S = chunks.std(axis=1, ddof=1)  # ddof=1 is the sample std

        # Avoid division by zero: keep only chunks with S>0
        mask = S > 0
        # Compute R/S for the good chunks
        RS = (R[mask] / S[mask]) if np.any(mask) else np.array([], dtype=float)

        # If we have no valid chunks, mark as NaN; else average them
        avg_rs.append(np.nan if RS.size == 0 else RS.mean())

    # Convert list to array for math
    avg_rs = np.array(avg_rs, dtype=float)

    # Keep only window sizes with valid (non-NaN, positive) average R/S
    ok = np.isfinite(avg_rs) & (avg_rs > 0)
    win_ok = windows[ok]
    rs_ok = avg_rs[ok]

    # We need at least 2 points to fit a line (log(R/S) vs log(window))
    if win_ok.size < 2:
        # Not enough data to estimate slope → return NaNs
        return float("nan"), windows, avg_rs

    # Take natural logs: the R/S theory says log(E[R/S]) ≈ H * log(n) + constant
    log_n = np.log(win_ok.astype(float))
    log_rs = np.log(rs_ok.astype(float))

    # Fit a straight line: slope is the Hurst exponent H
    # np.polyfit returns [slope, intercept] when deg=1
    slope, intercept = np.polyfit(log_n, log_rs, 1)
    H = float(slope)

    # Return H, plus the diagnostic vectors (windows and avg R/S per window)
    return H, windows, avg_rs


# ---------------------------
# Minimal demo you can run:
if __name__ == "__main__":
    rng = np.random.default_rng(0)
    steps = rng.standard_normal(4096)   # white noise
    walk  = steps.cumsum()              # random walk

    print("White noise H:", hurst_rs(steps, min_window=8)[0])  # ~0.5
    print("Random walk H:", hurst_rs(walk,  min_window=8)[0])  # ~1.0


White noise H: 0.5856402798514561
Random walk H: 1.0110888424383455


# Rolling-Canononical Hurst
---
### Combine the Canonical Multi-Window Hurst with a sliding-window feature to get summary value over the past 100-observations for each current observation.

In [5]:
import numpy as np
from typing import Iterable, List

def hurst_rs_multiscale(x: np.ndarray, min_window: int = 8) -> float:
    x = np.asarray(x, dtype=float).ravel()
    N = x.size
    if N < 2 or N < 2*min_window:
        return np.nan

    # powers of two: 8,16,32,... up to N//2
    windows: List[int] = []
    w = max(1, min_window)
    while w <= N // 2:
        windows.append(w)
        w *= 2
    windows = np.array(windows, dtype=int)

    avg_rs: List[float] = []
    for n in windows:
        k = N // n
        if k < 2:
            avg_rs.append(np.nan)
            continue
        x_trim = x[:k*n].reshape(k, n)
        dm = x_trim - x_trim.mean(axis=1, keepdims=True)
        cum = dm.cumsum(axis=1)
        R = cum.max(axis=1) - cum.min(axis=1)
        S = x_trim.std(axis=1, ddof=1)
        mask = S > 0
        rs = (R[mask] / S[mask]) if np.any(mask) else np.array([], dtype=float)
        avg_rs.append(np.nan if rs.size == 0 else rs.mean())

    avg_rs = np.array(avg_rs, dtype=float)
    ok = np.isfinite(avg_rs) & (avg_rs > 0)
    if ok.sum() < 2:
        return np.nan

    log_n  = np.log(windows[ok].astype(float))
    log_rs = np.log(avg_rs[ok].astype(float))
    slope, _ = np.polyfit(log_n, log_rs, 1)
    return float(slope)

def rolling_canonical_hurst(x: Iterable[float], lookback: int = 100, min_window: int = 8) -> np.ndarray:
    x = np.asarray(x, dtype=float).ravel()
    n = x.size
    out = np.full(n, np.nan, dtype=float)
    if lookback < 2*min_window or n < lookback:
        return out
    for t in range(lookback, n+1):
        out[t-1] = hurst_rs_multiscale(x[t-lookback:t], min_window=min_window)
    return out

# demo
if __name__ == "__main__":
    rng = np.random.default_rng(0)
    steps = rng.standard_normal(4096)  # white noise
    walk  = steps.cumsum()             # random walk

    hc_steps = rolling_canonical_hurst(steps, lookback=100, min_window=8)
    hc_walk  = rolling_canonical_hurst(walk,  lookback=100, min_window=8)

    print("White noise: last H =", hc_steps[-1], "mean H =", np.nanmean(hc_steps))
    print("Random walk: last H =", hc_walk[-1],  "mean H =", np.nanmean(hc_walk))


White noise: last H = 0.6399108850673125 mean H = 0.6386710086082407
Random walk: last H = 1.0780197713541193 mean H = 1.01476920809898
