# Data Consistency Testing - Comprehensive Test Suite

Tests:
1. Real-time download with validation and retry
2. InfluxDB write verification
3. Post-hoc coherence checking
4. Recovery mechanisms

In [None]:
# Cell 1: HTTP Logging Wrapper

def enable_http_logging_for_class(ClientClass):
    import functools
    from urllib.parse import urlencode

    orig_make_request = ClientClass._make_request

    @functools.wraps(orig_make_request)
    async def wrapped(self, endpoint, params):
        import time
        t0 = time.perf_counter()
        try:
            resp_text, raw_url = await orig_make_request(self, endpoint, params)
            dt = (time.perf_counter() - t0) * 1000

            base = raw_url or (self.base_url.rstrip("/") + endpoint)
            qs = urlencode({k: v for k, v in (params or {}).items() if v is not None})
            full = f"{base}?{qs}" if qs else base

            print(f"[TD-HTTP] GET {full} -> OK in {dt:.1f} ms")
            return resp_text, raw_url
        except Exception as e:
            base = (getattr(self, "base_url", "") or "").rstrip("/") + endpoint
            qs = urlencode({k: v for k, v in (params or {}).items() if v is not None})
            full = f"{base}?{qs}" if qs else base
            print(f"[TD-HTTP] GET {full} -> ERROR: {e}")
            raise

    ClientClass._make_request = wrapped


def enable_http_logging():
    """
    Monkey-patch idempotent on aiohttp.ClientSession._request to log final URLs (with query).
    If already patched, it won't patch twice.
    """
    import time
    import aiohttp
    from yarl import URL

    if getattr(aiohttp.ClientSession._request, "_td_patched", False):
        return  # already patched

    original = aiohttp.ClientSession._request

    async def wrapped(self, method, path_or_url, *args, **kwargs):
        t0 = time.perf_counter()
        try:
            res = await original(self, method, path_or_url, *args, **kwargs)
            dt = (time.perf_counter() - t0) * 1000
            try:
                final_url = str(res.url)
            except Exception:
                q = kwargs.get("params")
                final_url = str(URL(path_or_url).with_query(q)) if q else str(path_or_url)
            print(f"[TD-HTTP] {method} {final_url} -> {res.status} in {dt:.1f} ms")
            return res
        except Exception as e:
            dt = (time.perf_counter() - t0) * 1000
            try:
                q = kwargs.get("params")
                final_url = str(URL(path_or_url).with_query(q)) if q else str(path_or_url)
            except Exception:
                final_url = str(path_or_url)
            status = getattr(e, "status", "")
            print(f"[TD-HTTP] {method} {final_url} -> ERROR {status}: {e} in {dt:.1f} ms")
            raise

    wrapped._td_patched = True
    aiohttp.ClientSession._request = wrapped

enable_http_logging()

In [None]:
# Cell 2: Configuration and Task Setup

import os
from src.tdSynchManager.credentials import get_influx_credentials
from src.tdSynchManager.ThetaDataV3Client import ThetaDataV3Client
from src.tdSynchManager.manager import ThetaSyncManager
from src.tdSynchManager.config import ManagerConfig, Task, DiscoverPolicy
from src.tdSynchManager.utils import install_td_server_error_logger

# Get InfluxDB credentials
influx = get_influx_credentials()
influx_token = influx['token']
influx_url = influx.get('url', 'http://127.0.0.1:8181')
influx_bucket = influx.get('bucket', 'ThetaData')

symbols = ["TLRY"]  # Test symbol

cfg = ManagerConfig(
    root_dir="tests/data",
    max_concurrency=80,
    max_file_mb=16,
    overlap_seconds=60,
    influx_url=influx_url,
    influx_bucket=influx_bucket,
    influx_token=influx_token,
    influx_org=None,
    influx_precision="nanosecond",
    influx_measure_prefix="",
    influx_write_batch=5000,
    
    # Enable validation with strict mode
    enable_data_validation=True,
    validation_strict_mode=True,  # All-or-nothing persistence
)

tasks = [
    Task(
        asset="option",
        symbols=symbols,
        intervals=["1d"],
        sink="influxdb",
        enrich_bar_greeks=True,
        enrich_tick_greeks=True,
        first_date_override="20250821",
        ignore_existing=False,
        discover_policy=DiscoverPolicy(mode="skip")
    ),
    Task(
        asset="option",
        symbols=symbols,
        intervals=["5m"],
        sink="influxdb",
        enrich_bar_greeks=True,
        enrich_tick_greeks=True,
        first_date_override="20250821",
        ignore_existing=False,
        discover_policy=DiscoverPolicy(mode="skip")
    ),
]

print("Configuration loaded successfully")
print(f"Testing with symbols: {symbols}")
print(f"Validation enabled: {cfg.enable_data_validation}")
print(f"Strict mode: {cfg.validation_strict_mode}")

In [None]:
# Cell 3: Test Real-Time Download with Validation and Retry

print("\n" + "="*80)
print("TEST 1: Real-Time Download with Validation and Retry")
print("="*80)

async with ThetaDataV3Client() as client:
    install_td_server_error_logger(client)
    manager = ThetaSyncManager(cfg, client=client)
    
    print("\n[TEST] Running download tasks...")
    print("Expected behavior:")
    print("  - Download data for TLRY options")
    print("  - Validate completeness (candles, columns, volume)")
    print("  - Retry up to N times if validation fails")
    print("  - Write to InfluxDB with verification")
    print("  - Retry only missing rows if partial write detected")
    print("")
    
    await manager.run(tasks)
    
print("\n[TEST] Download completed. Check logs above for:")
print("  ✓ [VALIDATION] messages showing validation results")
print("  ✓ [INFLUX][WRITE] messages showing write operations")
print("  ✓ Retry attempts if validation failed")
print("  ✓ Verification after write")

In [None]:
# Cell 4: List Available Measurements in InfluxDB

from influxdb_client_3 import InfluxDBClient3

print("\n" + "="*80)
print("Available InfluxDB Measurements")
print("="*80 + "\n")

client = InfluxDBClient3(
    host=influx_url,
    database=influx_bucket,
    token=influx_token
)

try:
    # Query to list all tables
    query = "SHOW TABLES"
    result = client.query(query)
    df_tables = result.to_pandas()
    
    print(df_tables)
    print(f"\nTotal measurements: {len(df_tables)}")
    
    # Store for next cell
    measurements = df_tables['table_name'].tolist()
    
except Exception as e:
    print(f"Error listing tables: {e}")
    measurements = []

client.close()

In [None]:
# Cell 5: Test Post-Hoc Coherence Checking on All DBs

from src.tdSynchManager.coherence import CoherenceChecker
import pandas as pd

print("\n" + "="*80)
print("TEST 2: Post-Hoc Coherence Checking on All Measurements")
print("="*80)

async with ThetaDataV3Client() as client:
    install_td_server_error_logger(client)
    manager = ThetaSyncManager(cfg, client=client)
    
    checker = CoherenceChecker(manager)
    
    # Test on all available measurements
    for measurement in measurements:
        print(f"\n{'='*60}")
        print(f"Checking: {measurement}")
        print(f"{'='*60}")
        
        try:
            # Parse measurement name: {symbol}-{asset}-{interval}
            parts = measurement.split('-')
            if len(parts) >= 3:
                symbol = parts[0]
                asset = parts[1]
                interval = parts[2]
                
                # Determine date range to check (last 30 days)
                end_date = pd.Timestamp.now().date()
                start_date = end_date - pd.Timedelta(days=30)
                
                print(f"Symbol: {symbol}, Asset: {asset}, Interval: {interval}")
                print(f"Checking range: {start_date} to {end_date}")
                
                # Run coherence check
                issues = await checker.check_symbol_coherence(
                    symbol=symbol,
                    asset=asset,
                    interval=interval,
                    sink="influxdb",
                    start_date=str(start_date),
                    end_date=str(end_date)
                )
                
                if issues:
                    print(f"\n⚠️  Found {len(issues)} issues:")
                    for issue in issues:
                        print(f"  - {issue.issue_type}: {issue.description}")
                        if 'problem_segments' in issue.details:
                            for seg in issue.details['problem_segments'][:3]:  # Show first 3
                                print(f"    {seg.get('segment_start', 'N/A')}-{seg.get('segment_end', 'N/A')}: {seg.get('issue', 'N/A')}")
                else:
                    print("✅ No coherence issues found")
                    
        except Exception as e:
            print(f"❌ Error checking {measurement}: {e}")
            import traceback
            traceback.print_exc()

print("\n" + "="*80)
print("Coherence check completed")
print("="*80)

In [None]:
# Cell 6: Test InfluxDB Write Verification Directly

from src.tdSynchManager.influx_verification import verify_influx_write, InfluxWriteResult
import pandas as pd
import numpy as np

print("\n" + "="*80)
print("TEST 3: InfluxDB Write Verification (Direct Test)")
print("="*80)

# Create test DataFrame
test_data = pd.DataFrame({
    '__ts_utc': pd.date_range('2025-01-01', periods=10, freq='1H', tz='UTC'),
    'symbol': ['TEST'] * 10,
    'close': np.random.rand(10) * 100,
    'volume': np.random.randint(1000, 10000, 10)
})

print(f"\nTest data created: {len(test_data)} rows")
print(test_data.head())

# Connect to InfluxDB
influx_client = InfluxDBClient3(
    host=influx_url,
    database=influx_bucket,
    token=influx_token
)

measurement = "test_verification"

print(f"\nWriting to measurement: {measurement}")

# Write test data using line protocol
lines = []
for _, row in test_data.iterrows():
    ts_ns = int(row['__ts_utc'].value)
    line = f"{measurement},symbol={row['symbol']} close={row['close']},volume={row['volume']} {ts_ns}"
    lines.append(line)

try:
    influx_client.write(lines)
    print("✅ Write completed")
except Exception as e:
    print(f"❌ Write failed: {e}")

# Verify write
print("\nVerifying write...")

import asyncio

result = await verify_influx_write(
    influx_client=influx_client,
    measurement=measurement,
    df_original=test_data,
    key_cols=['__ts_utc', 'symbol'],
    time_col='__ts_utc'
)

print(f"\nVerification result:")
print(f"  Total attempted: {result.total_attempted}")
print(f"  Successfully written: {result.successfully_written}")
print(f"  Missing count: {result.missing_count}")
print(f"  Success: {result.success}")

if result.missing_count > 0:
    print(f"\n⚠️  Missing row indices: {result.missing_indices}")
else:
    print("\n✅ All rows verified successfully")

influx_client.close()

In [None]:
# Cell 7: Check Data Consistency Logs

import os
import glob

print("\n" + "="*80)
print("Data Consistency Logs")
print("="*80 + "\n")

log_dir = os.path.join(cfg.root_dir, "logs")

if os.path.exists(log_dir):
    log_files = glob.glob(os.path.join(log_dir, "data_consistency_*.parquet"))
    
    if log_files:
        print(f"Found {len(log_files)} log files:\n")
        
        # Read and display most recent log
        latest_log = max(log_files, key=os.path.getmtime)
        print(f"Latest log: {os.path.basename(latest_log)}")
        
        try:
            log_df = pd.read_parquet(latest_log)
            print(f"\nLog entries: {len(log_df)}")
            print("\nRecent log entries:")
            print(log_df.tail(20).to_string())
            
            # Summary by event type
            print("\n" + "-"*80)
            print("Summary by event type:")
            print(log_df['event_type'].value_counts())
            
        except Exception as e:
            print(f"Error reading log: {e}")
    else:
        print("No log files found")
else:
    print(f"Log directory not found: {log_dir}")

print("\n" + "="*80)
print("All tests completed")
print("="*80)