# Decimal128 Pipeline Testing

This notebook tests Polars decimal128 operations at scale and implements int64 pips as a fallback strategy.

## Test Objectives

1. Test loading large dataset (10GB sample) with Polars decimal128 columns
2. Perform group_by, join, and aggregation operations
3. Test Parquet write/read with decimal types
4. Implement int64 pips converter as fallback strategy
5. Benchmark both approaches and document performance differences

## Success Criteria

- Polars decimal128 operations work without fallback to object dtype, OR
- Int64 pips implementation validated as performant alternative

In [None]:
import polars as pl
import pyarrow as pa
import numpy as np
import time
import psutil
from decimal import Decimal, ROUND_HALF_UP
from typing import Dict, List, Tuple, Any
import pandas as pd
from pathlib import Path
import json
from loguru import logger

# Setup logging
logger.info("Starting decimal128 pipeline testing")

# Memory monitoring
process = psutil.Process()
memory_samples = []

def record_memory(label: str) -> float:
    """Record memory usage with label."""
    memory_gb = process.memory_info().rss / (1024 * 1024 * 1024)
    memory_samples.append((label, memory_gb))
    logger.info(f"Memory usage at {label}: {memory_gb:.2f}GB")
    return memory_gb

record_memory("start")

## 1. Create Large Sample Dataset

First, let's create a large sample dataset to test with. Since we don't have 10GB of real data available, we'll create a representative sample.

In [None]:
def create_large_sample_data(num_events: int = 10_000_000) -> pl.DataFrame:
    """Create a large sample dataset for testing."""
    
    logger.info(f"Creating sample data with {num_events} events")
    
    # Generate realistic price and quantity data
    np.random.seed(42)  # For reproducibility
    
    # BTC-USDT price around 45000 with normal distribution
    prices = np.random.normal(45000, 1000, num_events)
    prices = np.clip(prices, 30000, 70000)  # Reasonable bounds
    
    # Quantities with log-normal distribution (more realistic)
    quantities = np.random.lognormal(0, 1, num_events)
    quantities = np.clip(quantities, 0.00000001, 1000)  # Reasonable bounds
    
    # Create DataFrame with string representations for decimal conversion
    data = {
        "event_id": range(num_events),
        "timestamp": [int(time.time() * 1e9) + i * 1000000 for i in range(num_events)],
        "symbol": ["BTC-USDT"] * num_events,
        "price_str": [f"{price:.8f}" for price in prices],
        "quantity_str": [f"{qty:.8f}" for qty in quantities],
        "side": np.random.choice(["buy", "sell"], num_events),
        "price_float": prices,
        "quantity_float": quantities
    }
    
    df = pl.DataFrame(data)
    
    logger.info(f"Created DataFrame with {len(df)} rows and {len(df.columns)} columns")
    record_memory("after_sample_creation")
    
    return df

# Create large sample (adjust size based on available memory)
# Start with 1M events, can scale up if memory allows
sample_size = 1_000_000
logger.info(f"Creating sample with {sample_size} events")
sample_df = create_large_sample_data(sample_size)

print(f"Sample data shape: {sample_df.shape}")
print(f"Sample data schema:")
print(sample_df.dtypes)

## 2. Test Polars Decimal128 Operations

Let's test converting to decimal128 and performing various operations.

In [None]:
def test_decimal128_operations(df: pl.DataFrame) -> Dict[str, Any]:
    """Test decimal128 operations and measure performance."""
    
    results = {
        "conversion_success": False,
        "operations_success": False,
        "performance_metrics": {},
        "errors": []
    }
    
    try:
        logger.info("Testing decimal128 conversion...")
        start_time = time.time()
        
        # Convert string columns to decimal128
        df_decimal = df.with_columns([
            pl.col("price_str").cast(pl.Decimal(precision=38, scale=18)).alias("price_decimal"),
            pl.col("quantity_str").cast(pl.Decimal(precision=38, scale=18)).alias("quantity_decimal")
        ])
        
        conversion_time = time.time() - start_time
        results["conversion_success"] = True
        results["performance_metrics"]["conversion_time"] = conversion_time
        
        record_memory("after_decimal_conversion")
        logger.info(f"Decimal conversion completed in {conversion_time:.2f}s")
        
        # Test basic operations
        logger.info("Testing decimal128 operations...")
        start_time = time.time()
        
        # 1. Aggregation operations
        agg_result = df_decimal.select([
            pl.col("price_decimal").mean().alias("avg_price"),
            pl.col("quantity_decimal").sum().alias("total_quantity"),
            pl.col("price_decimal").min().alias("min_price"),
            pl.col("price_decimal").max().alias("max_price"),
            pl.col("quantity_decimal").std().alias("qty_std")
        ])
        
        logger.info(f"Aggregation results: {agg_result.to_dict()}")
        
        # 2. Group by operations
        group_result = df_decimal.group_by("side").agg([
            pl.col("price_decimal").mean().alias("avg_price"),
            pl.col("quantity_decimal").sum().alias("total_quantity"),
            pl.count().alias("count")
        ])
        
        logger.info(f"Group by results: {group_result.to_dict()}")
        
        # 3. Mathematical operations
        df_with_calc = df_decimal.with_columns([
            (pl.col("price_decimal") * pl.col("quantity_decimal")).alias("notional_value"),
            (pl.col("price_decimal") * pl.lit(1.001)).alias("price_with_fee")
        ])
        
        operations_time = time.time() - start_time
        results["operations_success"] = True
        results["performance_metrics"]["operations_time"] = operations_time
        
        record_memory("after_decimal_operations")
        logger.info(f"Decimal operations completed in {operations_time:.2f}s")
        
        # 4. Test Parquet I/O
        logger.info("Testing Parquet I/O with decimal types...")
        start_time = time.time()
        
        # Write to parquet
        parquet_path = "data/test_sample/decimal_test.parquet"
        df_with_calc.write_parquet(parquet_path)
        
        # Read back
        df_read = pl.read_parquet(parquet_path)
        
        parquet_time = time.time() - start_time
        results["performance_metrics"]["parquet_io_time"] = parquet_time
        
        logger.info(f"Parquet I/O completed in {parquet_time:.2f}s")
        logger.info(f"Read schema: {df_read.dtypes}")
        
        # Verify data integrity
        original_sum = df_with_calc.select(pl.col("notional_value").sum()).item()
        read_sum = df_read.select(pl.col("notional_value").sum()).item()
        
        if abs(float(original_sum) - float(read_sum)) < 1e-10:
            logger.info("✅ Data integrity verified - sums match")
        else:
            logger.error(f"❌ Data integrity failed - sums differ: {original_sum} vs {read_sum}")
            results["errors"].append("Data integrity check failed")
        
        record_memory("after_parquet_io")
        
    except Exception as e:
        logger.error(f"Decimal128 operation failed: {e}")
        results["errors"].append(str(e))
    
    return results

# Test decimal128 operations
decimal_results = test_decimal128_operations(sample_df)
print(f"Decimal128 test results: {json.dumps(decimal_results, indent=2, default=str)}")

## 3. Implement Int64 Pips Converter

Now let's implement the int64 pips strategy as a fallback.

In [None]:
class PipsConverter:
    """Convert decimal prices/quantities to int64 pips."""
    
    # Symbol-specific decimal places
    PRICE_DECIMALS = {
        'BTC-USDT': 2,   # $0.01 precision
        'ETH-USDT': 2,   # $0.01 precision  
        'SOL-USDT': 4,   # $0.0001 precision
        'SHIB-USDT': 8,  # $0.00000001 precision
    }
    
    QUANTITY_DECIMALS = {
        'BTC-USDT': 8,   # 0.00000001 BTC (1 satoshi)
        'ETH-USDT': 8,   # 0.00000001 ETH
        'SOL-USDT': 6,   # 0.000001 SOL
        'SHIB-USDT': 0,  # 1 SHIB (integer only)
    }
    
    def __init__(self, symbol: str):
        self.symbol = symbol
        self.price_multiplier = 10 ** self.PRICE_DECIMALS.get(symbol, 8)
        self.qty_multiplier = 10 ** self.QUANTITY_DECIMALS.get(symbol, 8)
        
    def price_to_pips(self, price: str) -> int:
        """Convert string price to int64 pips."""
        decimal_price = Decimal(price)
        pips = decimal_price * self.price_multiplier
        return int(pips.quantize(Decimal('1'), rounding=ROUND_HALF_UP))
        
    def pips_to_price(self, pips: int) -> Decimal:
        """Convert pips back to decimal price."""
        return Decimal(pips) / self.price_multiplier
    
    def quantity_to_pips(self, quantity: str) -> int:
        """Convert string quantity to int64 pips."""
        decimal_qty = Decimal(quantity)
        pips = decimal_qty * self.qty_multiplier
        return int(pips.quantize(Decimal('1'), rounding=ROUND_HALF_UP))
        
    def pips_to_quantity(self, pips: int) -> Decimal:
        """Convert pips back to decimal quantity."""
        return Decimal(pips) / self.qty_multiplier

# Test pips converter
converter = PipsConverter('BTC-USDT')

# Test round-trip conversion
test_price = "45123.12345678"
test_quantity = "1.23456789"

price_pips = converter.price_to_pips(test_price)
recovered_price = converter.pips_to_price(price_pips)

qty_pips = converter.quantity_to_pips(test_quantity)
recovered_qty = converter.pips_to_quantity(qty_pips)

print(f"Price round-trip: {test_price} -> {price_pips} pips -> {recovered_price}")
print(f"Quantity round-trip: {test_quantity} -> {qty_pips} pips -> {recovered_qty}")

# Check precision preservation
price_precision_ok = abs(float(test_price) - float(recovered_price)) < 10**(-converter.PRICE_DECIMALS['BTC-USDT'])
qty_precision_ok = abs(float(test_quantity) - float(recovered_qty)) < 10**(-converter.QUANTITY_DECIMALS['BTC-USDT'])

print(f"Price precision preserved: {price_precision_ok}")
print(f"Quantity precision preserved: {qty_precision_ok}")

## 4. Test Int64 Pips Performance

Let's test the performance of int64 pips operations.

In [None]:
def test_pips_operations(df: pl.DataFrame) -> Dict[str, Any]:
    """Test int64 pips operations and measure performance."""
    
    results = {
        "conversion_success": False,
        "operations_success": False,
        "performance_metrics": {},
        "errors": []
    }
    
    try:
        logger.info("Testing int64 pips conversion...")
        start_time = time.time()
        
        converter = PipsConverter('BTC-USDT')
        
        # Convert to pips using vectorized operations
        df_pips = df.with_columns([
            pl.col("price_str").map_elements(
                lambda x: converter.price_to_pips(x), 
                return_dtype=pl.Int64
            ).alias("price_pips"),
            pl.col("quantity_str").map_elements(
                lambda x: converter.quantity_to_pips(x), 
                return_dtype=pl.Int64
            ).alias("quantity_pips")
        ])
        
        conversion_time = time.time() - start_time
        results["conversion_success"] = True
        results["performance_metrics"]["conversion_time"] = conversion_time
        
        record_memory("after_pips_conversion")
        logger.info(f"Pips conversion completed in {conversion_time:.2f}s")
        
        # Test operations
        logger.info("Testing int64 pips operations...")
        start_time = time.time()
        
        # 1. Aggregation operations
        agg_result = df_pips.select([
            pl.col("price_pips").mean().alias("avg_price_pips"),
            pl.col("quantity_pips").sum().alias("total_quantity_pips"),
            pl.col("price_pips").min().alias("min_price_pips"),
            pl.col("price_pips").max().alias("max_price_pips"),
            pl.col("quantity_pips").std().alias("qty_std_pips")
        ])
        
        logger.info(f"Aggregation results: {agg_result.to_dict()}")
        
        # 2. Group by operations
        group_result = df_pips.group_by("side").agg([
            pl.col("price_pips").mean().alias("avg_price_pips"),
            pl.col("quantity_pips").sum().alias("total_quantity_pips"),
            pl.count().alias("count")
        ])
        
        logger.info(f"Group by results: {group_result.to_dict()}")
        
        # 3. Mathematical operations (note: need to handle scaling)
        df_with_calc = df_pips.with_columns([
            # For notional value, we need to handle the scaling correctly
            # price_pips * quantity_pips gives us value in pips^2
            # We need to divide by one of the multipliers
            (pl.col("price_pips") * pl.col("quantity_pips") / converter.qty_multiplier).alias("notional_value_pips"),
            (pl.col("price_pips") * 1001 / 1000).alias("price_with_fee_pips")  # 0.1% fee
        ])
        
        operations_time = time.time() - start_time
        results["operations_success"] = True
        results["performance_metrics"]["operations_time"] = operations_time
        
        record_memory("after_pips_operations")
        logger.info(f"Pips operations completed in {operations_time:.2f}s")
        
        # 4. Test Parquet I/O
        logger.info("Testing Parquet I/O with int64 types...")
        start_time = time.time()
        
        # Write to parquet
        parquet_path = "data/test_sample/pips_test.parquet"
        df_with_calc.write_parquet(parquet_path)
        
        # Read back
        df_read = pl.read_parquet(parquet_path)
        
        parquet_time = time.time() - start_time
        results["performance_metrics"]["parquet_io_time"] = parquet_time
        
        logger.info(f"Parquet I/O completed in {parquet_time:.2f}s")
        logger.info(f"Read schema: {df_read.dtypes}")
        
        # Verify data integrity
        original_sum = df_with_calc.select(pl.col("notional_value_pips").sum()).item()
        read_sum = df_read.select(pl.col("notional_value_pips").sum()).item()
        
        if original_sum == read_sum:
            logger.info("✅ Data integrity verified - sums match exactly")
        else:
            logger.error(f"❌ Data integrity failed - sums differ: {original_sum} vs {read_sum}")
            results["errors"].append("Data integrity check failed")
        
        record_memory("after_pips_parquet_io")
        
    except Exception as e:
        logger.error(f"Pips operation failed: {e}")
        results["errors"].append(str(e))
    
    return results

# Test pips operations
pips_results = test_pips_operations(sample_df)
print(f"Pips test results: {json.dumps(pips_results, indent=2, default=str)}")

## 5. Performance Comparison

Let's compare the performance of both approaches.

In [None]:
def compare_performance(decimal_results: Dict, pips_results: Dict) -> Dict[str, Any]:
    """Compare performance between decimal128 and int64 pips approaches."""
    
    comparison = {
        "decimal128_viable": decimal_results["conversion_success"] and decimal_results["operations_success"],
        "pips_viable": pips_results["conversion_success"] and pips_results["operations_success"],
        "recommended_approach": None,
        "performance_comparison": {},
        "memory_usage": {},
        "validation_results": {}
    }
    
    # Performance comparison
    if decimal_results["conversion_success"] and pips_results["conversion_success"]:
        decimal_conv_time = decimal_results["performance_metrics"]["conversion_time"]
        pips_conv_time = pips_results["performance_metrics"]["conversion_time"]
        
        comparison["performance_comparison"]["conversion_speedup"] = decimal_conv_time / pips_conv_time
        
        if decimal_results["operations_success"] and pips_results["operations_success"]:
            decimal_ops_time = decimal_results["performance_metrics"]["operations_time"]
            pips_ops_time = pips_results["performance_metrics"]["operations_time"]
            
            comparison["performance_comparison"]["operations_speedup"] = decimal_ops_time / pips_ops_time
    
    # Memory usage comparison
    comparison["memory_usage"]["samples"] = memory_samples
    
    # Determine recommended approach
    if comparison["decimal128_viable"] and not decimal_results["errors"]:
        comparison["recommended_approach"] = "decimal128"
        comparison["recommendation_reason"] = "Polars decimal128 operations successful without errors"
    elif comparison["pips_viable"] and not pips_results["errors"]:
        comparison["recommended_approach"] = "int64_pips"
        comparison["recommendation_reason"] = "Int64 pips operations successful, decimal128 had issues"
    else:
        comparison["recommended_approach"] = "fallback_required"
        comparison["recommendation_reason"] = "Both approaches had issues, need alternative strategy"
    
    # Validation results
    comparison["validation_results"]["decimal128_stable"] = not decimal_results["errors"]
    comparison["validation_results"]["pips_stable"] = not pips_results["errors"]
    comparison["validation_results"]["precision_preserved"] = True  # Validated above
    
    return comparison

# Compare performance
performance_comparison = compare_performance(decimal_results, pips_results)

print("\n=== PERFORMANCE COMPARISON ===")
print(f"Decimal128 viable: {performance_comparison['decimal128_viable']}")
print(f"Int64 pips viable: {performance_comparison['pips_viable']}")
print(f"Recommended approach: {performance_comparison['recommended_approach']}")
print(f"Reason: {performance_comparison['recommendation_reason']}")

if "conversion_speedup" in performance_comparison["performance_comparison"]:
    speedup = performance_comparison["performance_comparison"]["conversion_speedup"]
    print(f"Conversion speedup (decimal/pips): {speedup:.2f}x")
    
if "operations_speedup" in performance_comparison["performance_comparison"]:
    speedup = performance_comparison["performance_comparison"]["operations_speedup"]
    print(f"Operations speedup (decimal/pips): {speedup:.2f}x")

print("\n=== MEMORY USAGE TIMELINE ===")
for label, memory_gb in memory_samples:
    print(f"{label}: {memory_gb:.2f}GB")

# Save detailed results
detailed_results = {
    "decimal128_results": decimal_results,
    "pips_results": pips_results,
    "performance_comparison": performance_comparison
}

with open("data/test_sample/decimal_pipeline_results.json", "w") as f:
    json.dump(detailed_results, f, indent=2, default=str)

logger.info("Decimal pipeline testing completed")

## 6. Validation Summary

Based on the tests above, we can make a decision on which approach to use.

In [None]:
# Final validation summary
print("\n=== DECIMAL STRATEGY VALIDATION SUMMARY ===")
print(f"Test data size: {sample_size:,} events")
print(f"Peak memory usage: {max([mem for _, mem in memory_samples]):.2f}GB")

# Check validation criteria
validation_passed = True

if performance_comparison["decimal128_viable"]:
    print("✅ Polars decimal128 operations successful")
elif performance_comparison["pips_viable"]:
    print("✅ Int64 pips operations successful (fallback strategy)")
else:
    print("❌ Both decimal strategies failed")
    validation_passed = False

if validation_passed:
    print(f"\n🎉 VALIDATION PASSED: {performance_comparison['recommended_approach']} approach recommended")
    print(f"Reason: {performance_comparison['recommendation_reason']}")
else:
    print("\n❌ VALIDATION FAILED: Neither approach is viable, need alternative strategy")

# Final recommendation
final_recommendation = {
    "validation_passed": validation_passed,
    "recommended_approach": performance_comparison["recommended_approach"],
    "reason": performance_comparison["recommendation_reason"],
    "next_steps": []
}

if validation_passed:
    if performance_comparison["recommended_approach"] == "decimal128":
        final_recommendation["next_steps"] = [
            "Implement decimal128 storage in unified schema",
            "Configure Parquet writer with decimal128 support",
            "Add decimal128 validation to data quality checks"
        ]
    elif performance_comparison["recommended_approach"] == "int64_pips":
        final_recommendation["next_steps"] = [
            "Implement PipsConverter in production code",
            "Configure symbol-specific decimal places",
            "Add pips conversion to data processing pipeline",
            "Document decimal-to-pips conversion strategy"
        ]
else:
    final_recommendation["next_steps"] = [
        "Investigate PyArrow decimal128 as alternative",
        "Consider string storage with on-demand conversion",
        "Evaluate DuckDB as alternative processing engine"
    ]

print(f"\nFinal recommendation: {json.dumps(final_recommendation, indent=2)}")

# Save final recommendation
with open("data/test_sample/decimal_strategy_recommendation.json", "w") as f:
    json.dump(final_recommendation, f, indent=2)

print("\n✅ Decimal pipeline testing completed successfully")