# Concept Verification - Visual Overlay

Visual verification of all SMC concept detection modules on real NAS100 data.
Each concept is overlaid on a candlestick chart to verify correct detection.

In [None]:
import sys
sys.path.insert(0, '..')

import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from data.loader import load_instrument
from data.resampler import resample
from visualization.chart import candlestick_chart, add_markers, add_zone
from concepts.fractals import detect_swings, get_swing_points
from concepts.structure import detect_bos_choch, detect_cisd
from concepts.fvg import detect_fvg, track_fvg_lifecycle
from concepts.orderblocks import detect_orderblocks
from concepts.liquidity import detect_equal_levels, detect_session_levels
from concepts.zones import premium_discount_zones, classify_price_zone

print('All imports OK')

In [2]:
# Load NAS100 1-minute data
df_1m = load_instrument('NAS100')
print(f'Loaded {len(df_1m):,} rows of 1m data')
print(f'Date range: {df_1m["time"].iloc[0]} to {df_1m["time"].iloc[-1]}')

# Resample to 15m for cleaner visualization
df_15m = resample(df_1m, '15m')
print(f'Resampled to {len(df_15m):,} rows of 15m data')

# Use a 2-week slice for detailed views
SLICE_START = 500
SLICE_END = 1000
df_view = df_15m.iloc[SLICE_START:SLICE_END].copy()
print(f'View slice: {len(df_view)} bars (15m)')

Loaded 1,103,312 rows of 1m data
Date range: 2022-12-30 15:49:00+00:00 to 2026-02-13 14:35:00+00:00
Resampled to 73,953 rows of 15m data
View slice: 500 bars (15m)


## 1. Fractals (Swing Highs / Swing Lows)

Swing highs = triangle markers above candles, swing lows = below.

In [3]:
swings = detect_swings(df_view, swing_length=5)
points = get_swing_points(df_view, swings)

sh = points[points['direction'] == 1]
sl = points[points['direction'] == -1]
print(f'Swing Highs: {len(sh)}, Swing Lows: {len(sl)}')

fig = candlestick_chart(df_view, title='NAS100 15m - Fractals (Swing Highs/Lows)', height=600)

# Add swing high markers
sh_times = df_view.loc[sh['orig_index'], 'time'].values if 'time' in df_view.columns else sh['orig_index'].values
sh_prices = sh['level'].values
fig.add_trace(go.Scatter(
    x=sh_times, y=sh_prices,
    mode='markers', name='Swing High',
    marker=dict(symbol='triangle-down', size=10, color='red')
))

# Add swing low markers
sl_times = df_view.loc[sl['orig_index'], 'time'].values if 'time' in df_view.columns else sl['orig_index'].values
sl_prices = sl['level'].values
fig.add_trace(go.Scatter(
    x=sl_times, y=sl_prices,
    mode='markers', name='Swing Low',
    marker=dict(symbol='triangle-up', size=10, color='green')
))

fig.show()

Swing Highs: 32, Swing Lows: 31


## 2. Market Structure (BOS / CHoCH)

BOS = continuation break (blue), CHoCH = reversal break (orange).

In [4]:
events = detect_bos_choch(df_view, swing_length=5, close_break=True)
print(f'Structure events: {len(events)}')
if len(events) > 0:
    print(events[['type', 'direction', 'broken_level']].head(10))

fig = candlestick_chart(df_view, title='NAS100 15m - BOS/CHoCH', height=600)

for _, ev in events.iterrows():
    t = str(ev['type'])
    is_bos = 'BOS' in t
    color = 'blue' if is_bos else 'orange'
    label = 'BOS' if is_bos else 'CHoCH'
    direction = 'up' if ev['direction'] == 1 else 'down'
    
    broken_idx = ev['broken_index']
    swing_idx = ev['swing_index']
    
    if broken_idx in df_view.index and swing_idx in df_view.index:
        x0 = df_view.loc[swing_idx, 'time'] if 'time' in df_view.columns else swing_idx
        x1 = df_view.loc[broken_idx, 'time'] if 'time' in df_view.columns else broken_idx
        y = ev['broken_level']
        
        fig.add_trace(go.Scatter(
            x=[x0, x1], y=[y, y],
            mode='lines+text',
            line=dict(color=color, width=2, dash='dash'),
            text=[f'{label} {direction}', ''],
            textposition='top center',
            showlegend=False,
        ))

fig.show()

Structure events: 30
                  type  direction  broken_level
0    StructureType.BOS          1      11119.13
1  StructureType.CHOCH         -1      11075.38
2  StructureType.CHOCH          1      11128.75
3    StructureType.BOS          1      11205.50
4    StructureType.BOS          1      11212.75
5  StructureType.CHOCH         -1      11196.75
6  StructureType.CHOCH          1      11211.50
7  StructureType.CHOCH         -1      11196.25
8  StructureType.CHOCH          1      11215.25
9    StructureType.BOS          1      11242.50


## 3. Fair Value Gaps (FVGs)

Bullish FVG = green rectangle, Bearish FVG = red rectangle.

In [None]:
fvgs = detect_fvg(df_view, min_gap_pct=0.0005, join_consecutive=True)
lifecycle = track_fvg_lifecycle(df_view, fvgs, mitigation_mode="close", max_age_bars=192)

print(f'FVGs detected: {len(fvgs)}')
if len(fvgs) > 0:
    print(f'  Bullish: {(fvgs["direction"] == 1).sum()}')
    print(f'  Bearish: {(fvgs["direction"] == -1).sum()}')
    # Lifecycle summary
    from collections import Counter
    statuses = Counter(r['status'] for r in lifecycle)
    print(f'  Lifecycle: {dict(statuses)}')

fig = candlestick_chart(df_view, title='NAS100 15m - Fair Value Gaps (Lifecycle)', height=600)

for lc in lifecycle:
    start_idx = lc['start_index']
    end_idx = lc['end_index']
    direction = lc['direction']

    # Skip if start_index not in our view
    if start_idx not in df_view.index:
        continue

    x0 = df_view.loc[start_idx, 'time'] if 'time' in df_view.columns else start_idx
    x1 = df_view.loc[end_idx, 'time'] if end_idx in df_view.index else (
        df_view['time'].iloc[-1] if 'time' in df_view.columns else df_view.index[-1])

    top = lc['top']
    bottom = lc['bottom']
    midpoint = lc['midpoint']
    fill_level = lc['fill_level']
    status = str(lc['status'])
    inversion_idx = lc['inversion_index']

    # Colors based on direction
    if direction == 1:
        active_fill = 'rgba(0,200,0,0.15)'
        active_border = 'rgba(0,200,0,0.5)'
    else:
        active_fill = 'rgba(200,0,0,0.15)'
        active_border = 'rgba(200,0,0,0.5)'

    gray_fill = 'rgba(128,128,128,0.15)'
    gray_border = 'rgba(128,128,128,0.4)'

    if 'INVERTED' in status and inversion_idx is not None and inversion_idx in df_view.index:
        # Before inversion: active color from start to inversion
        x_inv = df_view.loc[inversion_idx, 'time'] if 'time' in df_view.columns else inversion_idx
        fig.add_shape(type='rect', x0=x0, x1=x_inv, y0=bottom, y1=top,
                      fillcolor=active_fill, line=dict(color=active_border, width=1))
        # After inversion: gray IFVG from inversion to end
        fig.add_shape(type='rect', x0=x_inv, x1=x1, y0=bottom, y1=top,
                      fillcolor=gray_fill, line=dict(color=gray_border, width=1))

    elif 'PARTIALLY_FILLED' in status and fill_level is not None:
        # Split: gray filled portion + colored active portion
        if direction == 1:
            # Bullish: filled from top down to fill_level (gray), active from fill_level to bottom (green)
            clamped_fill = max(min(fill_level, top), bottom)
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=clamped_fill, y1=top,
                          fillcolor=gray_fill, line=dict(color=gray_border, width=1))
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=bottom, y1=clamped_fill,
                          fillcolor=active_fill, line=dict(color=active_border, width=1))
        else:
            # Bearish: filled from bottom up to fill_level (gray), active from fill_level to top (red)
            clamped_fill = max(min(fill_level, top), bottom)
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=bottom, y1=clamped_fill,
                          fillcolor=gray_fill, line=dict(color=gray_border, width=1))
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=clamped_fill, y1=top,
                          fillcolor=active_fill, line=dict(color=active_border, width=1))
    else:
        # FRESH, TESTED, FULLY_FILLED: single rectangle
        if 'FULLY_FILLED' in status:
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=bottom, y1=top,
                          fillcolor=gray_fill, line=dict(color=gray_border, width=1))
        else:
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=bottom, y1=top,
                          fillcolor=active_fill, line=dict(color=active_border, width=1))

    # CE midpoint line (only on active, non-fully-filled FVGs)
    if 'FULLY_FILLED' not in status and 'INVERTED' not in status:
        fig.add_shape(type='line', x0=x0, x1=x1, y0=midpoint, y1=midpoint,
                      line=dict(color=active_border, width=1, dash='dot'))

fig.show()

## 4. Order Blocks

Bullish OB (demand) = green, Bearish OB (supply) = red.

In [6]:
events = detect_bos_choch(df_view, swing_length=5)
obs = detect_orderblocks(df_view, events)
print(f'Order Blocks: {len(obs)}')
if len(obs) > 0:
    print(f'  Bullish: {(obs["direction"] == 1).sum()}')
    print(f'  Bearish: {(obs["direction"] == -1).sum()}')

fig = candlestick_chart(df_view, title='NAS100 15m - Order Blocks', height=600)

if 'time' in df_view.columns:
    last_time = df_view['time'].iloc[-1]
else:
    last_time = df_view.index[-1]

for _, ob in obs.iterrows():
    ob_idx = ob['ob_index']
    if ob_idx not in df_view.index:
        continue
    
    x0 = df_view.loc[ob_idx, 'time'] if 'time' in df_view.columns else ob_idx
    color = 'rgba(0,150,0,0.2)' if ob['direction'] == 1 else 'rgba(150,0,0,0.2)'
    border = 'rgba(0,150,0,0.7)' if ob['direction'] == 1 else 'rgba(150,0,0,0.7)'
    
    fig.add_shape(
        type='rect',
        x0=x0, x1=last_time,
        y0=ob['bottom'], y1=ob['top'],
        fillcolor=color,
        line=dict(color=border, width=2),
    )

fig.show()

Order Blocks: 0


## 5. Combined View: All Concepts

Overlay fractals + structure + FVGs + OBs on a single chart.

In [None]:
# Shorter slice for readability
df_combined = df_15m.iloc[SLICE_START:SLICE_START+200].copy()

# Detect all concepts
swings = detect_swings(df_combined, swing_length=5)
points = get_swing_points(df_combined, swings)
events = detect_bos_choch(df_combined, swing_length=5)
fvgs = detect_fvg(df_combined, min_gap_pct=0.0005)
lifecycle = track_fvg_lifecycle(df_combined, fvgs, mitigation_mode="close", max_age_bars=192)
obs = detect_orderblocks(df_combined, events)

fig = candlestick_chart(df_combined, title='NAS100 15m - All Concepts Combined', height=700)

# Swing points
sh = points[points['direction'] == 1]
sl = points[points['direction'] == -1]

if len(sh) > 0:
    sh_times = df_combined.loc[sh['orig_index'].values, 'time'].values if 'time' in df_combined.columns else sh['orig_index'].values
    fig.add_trace(go.Scatter(x=sh_times, y=sh['level'].values, mode='markers', name='Swing High',
                             marker=dict(symbol='triangle-down', size=10, color='red')))

if len(sl) > 0:
    sl_times = df_combined.loc[sl['orig_index'].values, 'time'].values if 'time' in df_combined.columns else sl['orig_index'].values
    fig.add_trace(go.Scatter(x=sl_times, y=sl['level'].values, mode='markers', name='Swing Low',
                             marker=dict(symbol='triangle-up', size=10, color='green')))

# BOS/CHoCH lines
for _, ev in events.iterrows():
    t = str(ev['type'])
    is_bos = 'BOS' in t
    color = 'blue' if is_bos else 'orange'
    broken_idx = ev['broken_index']
    swing_idx = ev['swing_index']
    if broken_idx in df_combined.index and swing_idx in df_combined.index:
        x0 = df_combined.loc[swing_idx, 'time'] if 'time' in df_combined.columns else swing_idx
        x1 = df_combined.loc[broken_idx, 'time'] if 'time' in df_combined.columns else broken_idx
        fig.add_trace(go.Scatter(x=[x0, x1], y=[ev['broken_level']]*2,
                                 mode='lines', line=dict(color=color, width=2, dash='dash'), showlegend=False))

# FVG rectangles with lifecycle
for lc in lifecycle:
    start_idx = lc['start_index']
    end_idx = lc['end_index']
    if start_idx not in df_combined.index:
        continue
    x0 = df_combined.loc[start_idx, 'time'] if 'time' in df_combined.columns else start_idx
    x1 = df_combined.loc[end_idx, 'time'] if end_idx in df_combined.index else (
        df_combined['time'].iloc[-1] if 'time' in df_combined.columns else df_combined.index[-1])
    status = str(lc['status'])
    fill_level = lc['fill_level']
    inversion_idx = lc['inversion_index']

    if lc['direction'] == 1:
        active_fill = 'rgba(0,200,0,0.1)'
    else:
        active_fill = 'rgba(200,0,0,0.1)'
    gray_fill = 'rgba(128,128,128,0.1)'

    if 'INVERTED' in status and inversion_idx is not None and inversion_idx in df_combined.index:
        x_inv = df_combined.loc[inversion_idx, 'time'] if 'time' in df_combined.columns else inversion_idx
        fig.add_shape(type='rect', x0=x0, x1=x_inv, y0=lc['bottom'], y1=lc['top'],
                      fillcolor=active_fill, line=dict(width=0))
        fig.add_shape(type='rect', x0=x_inv, x1=x1, y0=lc['bottom'], y1=lc['top'],
                      fillcolor=gray_fill, line=dict(width=0))
    elif 'PARTIALLY_FILLED' in status and fill_level is not None:
        clamped = max(min(fill_level, lc['top']), lc['bottom'])
        if lc['direction'] == 1:
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=clamped, y1=lc['top'],
                          fillcolor=gray_fill, line=dict(width=0))
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=lc['bottom'], y1=clamped,
                          fillcolor=active_fill, line=dict(width=0))
        else:
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=lc['bottom'], y1=clamped,
                          fillcolor=gray_fill, line=dict(width=0))
            fig.add_shape(type='rect', x0=x0, x1=x1, y0=clamped, y1=lc['top'],
                          fillcolor=active_fill, line=dict(width=0))
    else:
        fill = gray_fill if 'FULLY_FILLED' in status else active_fill
        fig.add_shape(type='rect', x0=x0, x1=x1, y0=lc['bottom'], y1=lc['top'],
                      fillcolor=fill, line=dict(width=0))

# OB rectangles
for _, ob in obs.iterrows():
    ob_idx = ob['ob_index']
    if ob_idx not in df_combined.index:
        continue
    x0 = df_combined.loc[ob_idx, 'time'] if 'time' in df_combined.columns else ob_idx
    # OBs extend to end of chart (until broken/mitigated in future work)
    x1 = df_combined['time'].iloc[-1] if 'time' in df_combined.columns else df_combined.index[-1]
    color = 'rgba(0,100,200,0.15)' if ob['direction'] == 1 else 'rgba(200,100,0,0.15)'
    border = 'rgba(0,100,200,0.6)' if ob['direction'] == 1 else 'rgba(200,100,0,0.6)'
    fig.add_shape(type='rect', x0=x0, x1=x1, y0=ob['bottom'], y1=ob['top'], fillcolor=color,
                  line=dict(color=border, width=2))

print(f'Concepts: {len(points)} swings, {len(events)} structure, {len(fvgs)} FVGs, {len(obs)} OBs')
fig.update_layout(legend=dict(yanchor='top', y=0.99, xanchor='left', x=0.01))
fig.show()

## 6. Premium/Discount Zones

Shows equilibrium, premium (above 50%), and discount (below 50%) zones.

In [None]:
# Use swing range from the view slice
sh_pts = points[points['direction'] == 1]
sl_pts = points[points['direction'] == -1]

if len(sh_pts) > 0 and len(sl_pts) > 0:
    swing_high = sh_pts['level'].max()
    swing_low = sl_pts['level'].min()
    zones = premium_discount_zones(swing_high, swing_low)
    
    print(f'Range: {swing_low:.2f} - {swing_high:.2f}')
    print(f'Equilibrium: {zones["equilibrium"]:.2f}')
    print(f'Premium zone: {zones["premium_zone"]}')
    print(f'Discount zone: {zones["discount_zone"]}')
    
    fig = candlestick_chart(df_combined, title='NAS100 15m - Premium/Discount Zones', height=600)
    
    x0 = df_combined['time'].iloc[0] if 'time' in df_combined.columns else df_combined.index[0]
    x1 = df_combined['time'].iloc[-1] if 'time' in df_combined.columns else df_combined.index[-1]
    
    # Premium zone (red)
    fig.add_shape(type='rect', x0=x0, x1=x1,
                  y0=zones['equilibrium'], y1=swing_high,
                  fillcolor='rgba(255,0,0,0.05)', line=dict(width=0))
    
    # Discount zone (green)
    fig.add_shape(type='rect', x0=x0, x1=x1,
                  y0=swing_low, y1=zones['equilibrium'],
                  fillcolor='rgba(0,255,0,0.05)', line=dict(width=0))
    
    # Equilibrium line
    fig.add_hline(y=zones['equilibrium'], line_color='gray', line_dash='dash',
                  annotation_text='Equilibrium (50%)')
    fig.add_hline(y=zones['quarter_75'], line_color='red', line_dash='dot',
                  annotation_text='75% (Premium)')
    fig.add_hline(y=zones['quarter_25'], line_color='green', line_dash='dot',
                  annotation_text='25% (Discount)')
    
    fig.show()
else:
    print('Not enough swing points for zone analysis')

## 7. Detection Statistics Summary

In [None]:
# Run on the full 15m dataset
full_swings = detect_swings(df_15m, swing_length=5)
full_points = get_swing_points(df_15m, full_swings)
full_events = detect_bos_choch(df_15m, swing_length=5)
full_cisd = detect_cisd(df_15m)
full_fvgs = detect_fvg(df_15m, min_gap_pct=0.0005)
full_obs = detect_orderblocks(df_15m, full_events)
full_eq = detect_equal_levels(df_15m, swing_length=5)
full_sessions = detect_session_levels(df_15m, level_type='daily')

print('='*50)
print(f'DETECTION SUMMARY - NAS100 15m ({len(df_15m):,} bars)')
print('='*50)
print(f'Swing Points:    {len(full_points):>6} (H={full_points["direction"].eq(1).sum()}, L={full_points["direction"].eq(-1).sum()})')
print(f'Structure Events:{len(full_events):>6}')
print(f'CISD Events:     {len(full_cisd):>6}')
print(f'FVGs:            {len(full_fvgs):>6} (Bull={full_fvgs["direction"].eq(1).sum() if len(full_fvgs) else 0}, Bear={full_fvgs["direction"].eq(-1).sum() if len(full_fvgs) else 0})')
print(f'Order Blocks:    {len(full_obs):>6} (Bull={full_obs["direction"].eq(1).sum() if len(full_obs) else 0}, Bear={full_obs["direction"].eq(-1).sum() if len(full_obs) else 0})')
print(f'Equal Levels:    {len(full_eq):>6}')
print(f'Session Levels:  {len(full_sessions):>6}')