# DE40 Trend and Range Analysis

## Objective
Analyze the trend characteristics and range behavior of the DAX Index (DE40) to understand market structure, volatility patterns, and mean-reversion vs. momentum tendencies.

## Key Metrics
- **Hurst Exponent**: Measure of market efficiency (trending vs. mean-reverting)
- **Autocorrelation**: Price momentum persistence
- **Range Metrics**: Intraday range, volatility clustering
- **Gap Analysis**: Data quality and market anomalies

## 1. Environment Setup

In [None]:
import sys
sys.path.insert(0, '../../')

from shared.database_connector import fetch_ohlcv, get_date_range
from shared.data_module import process_data
from shared.config import SYMBOLS
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Set style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (14, 6)

print("[OK] Environment setup complete")
print(f"DE40 Symbol Info: {SYMBOLS['deuidxeur']}")

## 2. Data Fetching and Gap Analysis

In [None]:
# Get available date range
date_range = get_date_range('deuidxeur', 'h1')
print(f"Available data: {date_range['start']} to {date_range['end']}")

# Use last 12 months for analysis
end_date = date_range['end']
start_date = end_date - timedelta(days=365)

print(f"\nAnalysis period: {start_date.date()} to {end_date.date()}")

In [None]:
# Fetch raw data
df_raw = fetch_ohlcv(
    symbol='deuidxeur',
    timeframe='h1',
    start_date=start_date,
    end_date=end_date
)

print(f"Raw data shape: {df_raw.shape}")
print(f"\nRaw data info:")
print(df_raw.info())
print(f"\nFirst 5 rows:")
print(df_raw.head())

In [None]:
# Process data (this will handle gaps, outliers, imputation, timezone conversion)
df_clean = process_data(
    df=df_raw,
    symbol='deuidxeur',
    timeframe='h1',
    local_time=True,  # Convert to Europe/Berlin timezone
    exclude_news=False
)

print(f"Cleaned data shape: {df_clean.shape}")
print(f"\nData after processing:")
print(df_clean.info())
print(f"\nTimezone: {df_clean.index.tz}")
print(f"\nFirst 5 rows (cleaned):")
print(df_clean.head())

## 3. Gap Analysis - Data Quality Assessment

In [None]:
# Calculate missing data statistics
missing_before = df_raw.isnull().sum()
missing_after = df_clean.isnull().sum()

print("Missing Data Statistics:")
print(f"\nBefore Processing (raw, unfiltered):")
print(missing_before)
print(f"\nAfter Processing (filtered to market hours):")
print(missing_after)

# Gap Analysis - Raw Data (ALL timestamps including nights)
print(f"\n" + "="*70)
print("Gap Analysis - RAW DATA (includes night hours, weekends, holidays)")
print("="*70)
expected_candles_raw = (df_raw.index[-1] - df_raw.index[0]).total_seconds() / 3600  # 1-hour bars
actual_candles_raw = len(df_raw)
gap_percentage_raw = ((expected_candles_raw - actual_candles_raw) / expected_candles_raw * 100)

print(f"Expected candles (continuous, all hours): {expected_candles_raw:.0f}")
print(f"Actual candles: {actual_candles_raw}")
print(f"Missing candles: {expected_candles_raw - actual_candles_raw:.0f} ({gap_percentage_raw:.2f}%)")
print(f"Note: This includes night hours when market is CLOSED - not meaningful for trading")

# Gap Analysis - Clean Data (MARKET HOURS ONLY)
print(f"\n" + "="*70)
print("Gap Analysis - CLEAN DATA (market hours only: 09:00-17:30 Berlin time)")
print("="*70)
expected_candles_clean = (df_clean.index[-1] - df_clean.index[0]).total_seconds() / 3600  # 1-hour bars
actual_candles_clean = len(df_clean)
gap_percentage_clean = ((expected_candles_clean - actual_candles_clean) / expected_candles_clean * 100) if expected_candles_clean > 0 else 0

print(f"Expected candles (market hours only): {expected_candles_clean:.0f}")
print(f"Actual candles: {actual_candles_clean}")
print(f"Missing candles: {expected_candles_clean - actual_candles_clean:.0f} ({gap_percentage_clean:.2f}%)")
print(f"Data quality during market hours: {100 - gap_percentage_clean:.1f}%")

print(f"\n" + "="*70)
print(f"Summary:")
print(f"  Raw data candles: {actual_candles_raw}")
print(f"  Filtered candles (market hours): {actual_candles_clean}")
print(f"  Removed (night hours, weekends, holidays): {actual_candles_raw - actual_candles_clean}")
print(f"  Data quality during market hours: {100 - gap_percentage_clean:.1f}%")
print(f"="*70)

In [None]:
# Calculate missing data statistics
missing_before = df_raw.isnull().sum()
missing_after = df_clean.isnull().sum()

print("Missing Data Statistics:")
print(f"\nBefore Processing (raw, unfiltered):") 
print(missing_before)
print(f"\nAfter Processing (filtered to market hours):") 
print(missing_after)

# Gap Analysis - Raw Data (ALL timestamps including nights)
print(f"\n" + "="*70)
print("Gap Analysis - RAW DATA (includes night hours, weekends, holidays)")
print("="*70)
print(f"Note: Raw data contains ALL hours from database (UTC timestamps)")
expected_candles_raw = (df_raw.index[-1] - df_raw.index[0]).total_seconds() / 3600  # 1-hour bars
actual_candles_raw = len(df_raw)
gap_percentage_raw = ((expected_candles_raw - actual_candles_raw) / expected_candles_raw * 100)

print(f"Expected candles (continuous, all hours): {expected_candles_raw:.0f}")
print(f"Actual candles: {actual_candles_raw}")
print(f"Missing candles: {expected_candles_raw - actual_candles_raw:.0f} ({gap_percentage_raw:.2f}%)")
print(f"Reason: Includes night hours (23:00-09:00 in DAX local time) when market is CLOSED")

# Gap Analysis - Clean Data (MARKET HOURS ONLY)
print(f"\n" + "="*70)
print("Gap Analysis - CLEAN DATA (market hours only: 09:00-17:30 Berlin time)")
print("="*70)
print(f"Note: Data is already FILTERED to trading hours")
expected_candles_clean = (df_clean.index[-1] - df_clean.index[0]).total_seconds() / 3600  # 1-hour bars
actual_candles_clean = len(df_clean)
gap_percentage_clean = ((expected_candles_clean - actual_candles_clean) / expected_candles_clean * 100) if expected_candles_clean > 0 else 0

print(f"Expected candles (continuous, market hours only): {expected_candles_clean:.0f}")
print(f"Actual candles: {actual_candles_clean}")
print(f"Missing candles: {expected_candles_clean - actual_candles_clean:.0f} ({gap_percentage_clean:.2f}%)")
print(f"Data quality during market hours: {100 - gap_percentage_clean:.1f}%")
print(f"\nDST Note: Europe/Berlin timezone automatically applies CET (UTC+1) in winter and CEST (UTC+2) in summer")

print(f"\n" + "="*70)
print(f"Summary:")
print(f"  Raw data candles (all hours): {actual_candles_raw}")
print(f"  Filtered candles (09:00-17:30, weekdays, non-holidays): {actual_candles_clean}")
print(f"  Removed (night hours, weekends, holidays): {actual_candles_raw - actual_candles_clean}")
print(f"  Coverage: {(actual_candles_clean/actual_candles_raw)*100:.1f}% of raw data is trading hours")
print(f"  Data quality during market hours: {100 - gap_percentage_clean:.1f}%")
print(f"="*70)

## 4. Hurst Exponent Calculation

In [None]:
def calculate_hurst_exponent(price_series, max_lag=1000):
    """
    Calculate Hurst Exponent using Rescaled Range Analysis.
    
    H = 0.5: Random walk (no trend, no mean reversion)
    H > 0.5: Trending market (momentum)
    H < 0.5: Mean-reverting market
    """
    lags = range(10, max_lag, 10)
    tau = []
    
    for lag in lags:
        # Get log returns
        returns = np.log(price_series / price_series.shift(1)).dropna()
        
        # Mean-adjusted returns
        mean_adjusted = returns - returns.mean()
        
        # Cumulative sum
        cumsum = np.cumsum(mean_adjusted[:lag])
        
        # Range: max - min
        range_val = np.max(cumsum) - np.min(cumsum)
        
        # Standard deviation
        std = np.std(returns[:lag], ddof=1)
        
        if std > 0:
            tau.append(range_val / std)
    
    # Log-log regression to find slope (Hurst exponent)
    lags = np.array(list(lags))[:len(tau)]
    poly = np.polyfit(np.log(lags), np.log(tau), 1)
    hurst = poly[0]
    
    return hurst, lags, np.array(tau)

print("Hurst Exponent function defined")

In [None]:
# Calculate Hurst exponent for close prices
hurst, lags, tau = calculate_hurst_exponent(df_clean['close'], max_lag=500)

print(f"\nHurst Exponent Analysis:")
print(f"Hurst Exponent: {hurst:.4f}")

if hurst < 0.45:
    interpretation = "Strong Mean-Reversion (H < 0.45)"
elif hurst < 0.5:
    interpretation = "Mild Mean-Reversion (0.45 ≤ H < 0.5)"
elif hurst == 0.5:
    interpretation = "Random Walk (H = 0.5)"
elif hurst < 0.55:
    interpretation = "Mild Momentum/Trend (0.5 < H < 0.55)"
else:
    interpretation = "Strong Trend (H ≥ 0.55)"

print(f"Interpretation: {interpretation}")
print(f"\nImplication for Trading:")
if hurst < 0.5:
    print("  -> Mean-reversion strategies may be profitable")
    print("  -> Momentum-following strategies may underperform")
else:
    print("  -> Trending/momentum strategies may be profitable")
    print("  -> Mean-reversion strategies may underperform")

In [None]:
# Visualize Hurst exponent calculation
fig, ax = plt.subplots(figsize=(12, 6))

# Log-log plot
ax.loglog(lags, tau, 'bo', markersize=6, label='Rescaled Range (tau)')

# Fit line
fit_tau = np.exp(np.polyfit(np.log(lags), np.log(tau), 1)[0] * np.log(lags) + np.polyfit(np.log(lags), np.log(tau), 1)[1])
ax.loglog(lags, fit_tau, 'r--', linewidth=2, label=f'Fit (H = {hurst:.4f})')

ax.set_xlabel('Time Lag (hours)', fontsize=11, fontweight='bold')
ax.set_ylabel('Rescaled Range (R/S)', fontsize=11, fontweight='bold')
ax.set_title(f'Hurst Exponent Analysis - DE40 (H = {hurst:.4f})', fontsize=13, fontweight='bold')
ax.legend(fontsize=10)
ax.grid(True, which='both', alpha=0.3)

plt.tight_layout()
plt.show()

## 5. Autocorrelation Analysis

In [None]:
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import adfuller

# Calculate returns
returns = np.log(df_clean['close'] / df_clean['close'].shift(1)).dropna()

print(f"Return Statistics:")
print(f"Mean return: {returns.mean() * 100:.4f}%")
print(f"Std dev: {returns.std() * 100:.4f}%")
print(f"Sharpe ratio (annualized): {(returns.mean() / returns.std()) * np.sqrt(252*24):.4f}")

# ADF test for stationarity
adf_result = adfuller(returns, autolag='AIC')
print(f"\nADF Test (Returns):")
print(f"ADF Statistic: {adf_result[0]:.6f}")
print(f"P-value: {adf_result[1]:.6f}")
print(f"Stationary: {'Yes (p < 0.05)' if adf_result[1] < 0.05 else 'No (p >= 0.05)'}")

In [None]:
# ACF/PACF plots
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# ACF of returns
plot_acf(returns, lags=50, ax=axes[0, 0])
axes[0, 0].set_title('ACF - Log Returns (1h bars)', fontsize=11, fontweight='bold')

# PACF of returns
plot_pacf(returns, lags=50, ax=axes[0, 1])
axes[0, 1].set_title('PACF - Log Returns (1h bars)', fontsize=11, fontweight='bold')

# ACF of squared returns (volatility clustering)
plot_acf(returns**2, lags=50, ax=axes[1, 0])
axes[1, 0].set_title('ACF - Squared Returns (Volatility)', fontsize=11, fontweight='bold')

# Rolling volatility
rolling_vol = returns.rolling(window=24).std() * 100  # 24-hour rolling vol
axes[1, 1].plot(rolling_vol.index, rolling_vol.values, linewidth=1, color='steelblue')
axes[1, 1].set_title('24h Rolling Volatility', fontsize=11, fontweight='bold')
axes[1, 1].set_xlabel('Date')
axes[1, 1].set_ylabel('Volatility (%)')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("[OK] Autocorrelation analysis complete")

In [None]:
# Calculate autocorrelation at specific lags
lags_to_check = [1, 2, 4, 8, 24, 48]
autocorr_values = [returns.autocorr(lag=lag) for lag in lags_to_check]

print(f"Autocorrelation at Specific Lags:")
for lag, autocorr in zip(lags_to_check, autocorr_values):
    print(f"  Lag {lag:2d}: {autocorr:7.4f}")

# Interpretation
if max(abs(ac) for ac in autocorr_values) < 0.05:
    print("\nInterpretation: No significant autocorrelation - random walk behavior")
elif any(ac > 0.1 for ac in autocorr_values):
    print("\nInterpretation: Positive autocorrelation - momentum signals may exist")
elif any(ac < -0.1 for ac in autocorr_values):
    print("\nInterpretation: Negative autocorrelation - mean reversion signals may exist")

## 6. Range and Volatility Analysis

In [None]:
# Calculate range metrics
df_clean['range'] = df_clean['high'] - df_clean['low']
df_clean['range_pct'] = (df_clean['range'] / df_clean['open'] * 100)
df_clean['hl2'] = (df_clean['high'] + df_clean['low']) / 2
df_clean['true_range'] = np.maximum(
    df_clean['high'] - df_clean['low'],
    np.maximum(
        abs(df_clean['high'] - df_clean['close'].shift(1)),
        abs(df_clean['low'] - df_clean['close'].shift(1))
    )
)

print(f"Range Statistics:")
print(f"Mean range: {df_clean['range'].mean():.2f} points")
print(f"Median range: {df_clean['range'].median():.2f} points")
print(f"Std dev range: {df_clean['range'].std():.2f} points")
print(f"Min range: {df_clean['range'].min():.2f} points")
print(f"Max range: {df_clean['range'].max():.2f} points")

print(f"\nRange % Statistics:")
print(f"Mean range %: {df_clean['range_pct'].mean():.4f}%")
print(f"Median range %: {df_clean['range_pct'].median():.4f}%")
print(f"Std dev range %: {df_clean['range_pct'].std():.4f}%")

In [None]:
# Visualize range distribution and time series
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Range histogram
axes[0, 0].hist(df_clean['range'], bins=50, color='steelblue', edgecolor='black', alpha=0.7)
axes[0, 0].axvline(df_clean['range'].mean(), color='red', linestyle='--', linewidth=2, label=f'Mean: {df_clean["range"].mean():.2f}')
axes[0, 0].set_xlabel('Range (points)', fontweight='bold')
axes[0, 0].set_ylabel('Frequency', fontweight='bold')
axes[0, 0].set_title('Range Distribution', fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Range time series
axes[0, 1].plot(df_clean.index, df_clean['range'], linewidth=0.8, color='steelblue', alpha=0.7)
axes[0, 1].plot(df_clean.index, df_clean['range'].rolling(24).mean(), linewidth=2, color='red', label='24h MA')
axes[0, 1].set_xlabel('Date', fontweight='bold')
axes[0, 1].set_ylabel('Range (points)', fontweight='bold')
axes[0, 1].set_title('Range Over Time', fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Range % distribution
axes[1, 0].hist(df_clean['range_pct'], bins=50, color='green', edgecolor='black', alpha=0.7)
axes[1, 0].axvline(df_clean['range_pct'].mean(), color='red', linestyle='--', linewidth=2, label=f'Mean: {df_clean["range_pct"].mean():.4f}%')
axes[1, 0].set_xlabel('Range %', fontweight='bold')
axes[1, 0].set_ylabel('Frequency', fontweight='bold')
axes[1, 0].set_title('Range % Distribution', fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# ATR (Average True Range)
atr = df_clean['true_range'].rolling(14).mean()
axes[1, 1].plot(df_clean.index, atr, linewidth=1.5, color='purple', label='ATR(14)')
axes[1, 1].fill_between(df_clean.index, atr * 0.5, atr * 1.5, alpha=0.2, color='purple')
axes[1, 1].set_xlabel('Date', fontweight='bold')
axes[1, 1].set_ylabel('ATR (points)', fontweight='bold')
axes[1, 1].set_title('Average True Range (14)', fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 7. Conclusions and Summary

In [None]:
print("="*70)
print("DE40 TREND AND RANGE ANALYSIS - SUMMARY")
print("="*70)

print(f"\n1. DATA QUALITY:")
print(f"   * Period Analyzed: {df_clean.index[0].date()} to {df_clean.index[-1].date()}")
print(f"   * Total Candles: {len(df_clean)}")
print(f"   * Missing Data: {missing_after.sum()} rows (after imputation: {missing_after.sum() == 0})")
print(f"   * Gap Assessment: {gap_percentage_clean:.2f}% missing (during market hours)")

print(f"\n2. MARKET EFFICIENCY (HURST EXPONENT):")
print(f"   * H-value: {hurst:.4f}")
print(f"   * Type: {interpretation}")
print(f"   * Volatility: Annualized = {returns.std() * np.sqrt(252*24) * 100:.2f}%")

print(f"\n3. AUTOCORRELATION:")
for lag, autocorr in zip(lags_to_check, autocorr_values):
    significance = "***" if abs(autocorr) > 0.1 else "*" if abs(autocorr) > 0.05 else ""
    print(f"   * Lag {lag:2d}: {autocorr:7.4f} {significance}")

print(f"\n4. VOLATILITY & RANGE:")
print(f"   * Mean Range: {df_clean['range'].mean():.2f} points ({df_clean['range_pct'].mean():.4f}%)")
print(f"   * Range Std Dev: {df_clean['range'].std():.2f} points")
print(f"   * ATR(14): {atr.iloc[-1]:.2f} points")
print(f"   * Volatility Clustering: {'Yes' if any(abs(ac) > 0.1 for ac in autocorr_values[:4]) else 'No significant'}")

print(f"\n5. TRADING IMPLICATIONS:")
if hurst < 0.5:
    print(f"   -> Mean-reversion strategies may work (H < 0.5)")
    print(f"   -> Implement range-trading on intraday levels")
else:
    print(f"   -> Trending/momentum strategies may work (H > 0.5)")
    print(f"   -> Follow breakouts and maintain directional bias")

print(f"\n" + "="*70)