In [None]:
'''
here claude sonnet 4 compared compression methods for saving and reading 20 years of 1 minute bars for SPY.
based on test results, i select parquet with zstd compression.

# Parquet Zstd (optimized) shows excellent balance:
# - File size: 67.2MB (3rd smallest)
# - Read speed: 0.071s (very fast)
# - Write speed: 1.062s (reasonable)
# - Best compression ratio vs speed trade-off

# Comparison with other top performers:

# Format          | Type      |    Write |     Read |      Size
# -------------------------------------------------------------
# Parquet None    | optimized |   1.013s |   0.048s |   101.7MB
# Parquet Snappy  | optimized |   0.927s |   0.066s |    85.8MB
# Parquet Gzip    | optimized |   4.971s |   0.096s |    61.5MB
# Parquet Zstd    | optimized |   1.062s |   0.071s |    67.2MB
# Parquet Brotli  | optimized |   9.540s |   0.114s |    57.4MB
# Parquet LZ4     | optimized |   0.932s |   0.067s |    85.2MB
# Feather         | optimized |   0.186s |   0.069s |    88.6MB

# Recommended storage format:
def save_optimized_bars(df, filepath):
    """Save DataFrame with optimized data types and compression"""
    # Optimize data types
    df_opt = df.copy()
    df_opt['timestamp'] = pd.to_datetime(df_opt['timestamp']).astype('int64') // 10**9
    df_opt['open'] = df_opt['open'].astype('float32')
    df_opt['high'] = df_opt['high'].astype('float32')
    df_opt['low'] = df_opt['low'].astype('float32')
    df_opt['close'] = df_opt['close'].astype('float32')
    df_opt['volume'] = df_opt['volume'].astype('uint32')
    df_opt['vwap'] = df_opt['vwap'].astype('float32')
    df_opt['transactions'] = df_opt['transactions'].astype('uint32')
    
    # Save with Zstd compression
    df_opt.to_parquet(filepath, engine='pyarrow', compression='zstd')

'''

In [None]:
!pip install pyarrow
!pip install tables

In [49]:
import pandas as pd
import numpy as np
import time
import os
import sys

# Add the parent directory to Python path to import api_key module
sys.path.append(os.path.dirname(os.path.abspath('')))
import settings

def test_formats(df, dir_path, timestamp_type):
    """Test different file formats for saving and reading data"""
    results = []
    
    # Define test formats
    formats = [
        # CSV
        ('CSV', lambda: df.to_csv(os.path.join(dir_path, f'SPY_{timestamp_type}.csv'), index=False)),
        
        # Parquet with different compressions
        ('Parquet None', lambda: df.to_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_none.parquet'), 
                                               engine='pyarrow', compression=None)),
        ('Parquet Snappy', lambda: df.to_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_snappy.parquet'), 
                                                  engine='pyarrow', compression='snappy')),
        ('Parquet Gzip', lambda: df.to_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_gzip.parquet'), 
                                                engine='pyarrow', compression='gzip')),
        ('Parquet Zstd', lambda: df.to_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_zstd.parquet'), 
                                                engine='pyarrow', compression='zstd')),
        ('Parquet Brotli', lambda: df.to_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_brotli.parquet'), 
                                                  engine='pyarrow', compression='brotli')),
        ('Parquet LZ4', lambda: df.to_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_lz4.parquet'), 
                                               engine='pyarrow', compression='lz4')),
        
        # Other formats
        ('Feather', lambda: df.to_feather(os.path.join(dir_path, f'SPY_{timestamp_type}.feather'))),
        ('HDF5', lambda: df.to_hdf(os.path.join(dir_path, f'SPY_{timestamp_type}.hdf'), key='SPY', mode='w')),
        ('Numpy NPZ', lambda: np.savez_compressed(os.path.join(dir_path, f'SPY_{timestamp_type}.npz'), 
                                                   data=df.to_numpy(), columns=df.columns.to_numpy())),
    ]
    
    # Test writing speed and file size
    print(f"\n--- Writing Performance ({timestamp_type} timestamps) ---")
    for name, write_func in formats:
        try:
            start_time = time.time()
            write_func()
            write_time = time.time() - start_time
            
            # Get file size
            if 'CSV' in name:
                file_path = os.path.join(dir_path, f'SPY_{timestamp_type}.csv')
            elif 'Parquet' in name:
                compression = name.split()[-1].lower()
                file_path = os.path.join(dir_path, f'SPY_{timestamp_type}_{compression}.parquet')
            elif 'Feather' in name:
                file_path = os.path.join(dir_path, f'SPY_{timestamp_type}.feather')
            elif 'HDF5' in name:
                file_path = os.path.join(dir_path, f'SPY_{timestamp_type}.hdf')
            elif 'Numpy' in name:
                file_path = os.path.join(dir_path, f'SPY_{timestamp_type}.npz')
            
            file_size = os.path.getsize(file_path) / (1024 * 1024)  # MB
            
            results.append({
                'format': name,
                'write_time': write_time,
                'file_size': file_size,
                'timestamp_type': timestamp_type
            })
            
            print(f"{name:15} | Write: {write_time:.3f}s | Size: {file_size:.1f}MB")
            
        except Exception as e:
            print(f"{name:15} | Error: {e}")
    
    # Test reading speed
    print(f"\n--- Reading Performance ({timestamp_type} timestamps) ---")
    read_formats = [
        ('CSV', lambda: pd.read_csv(os.path.join(dir_path, f'SPY_{timestamp_type}.csv'))),
        ('Parquet None', lambda: pd.read_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_none.parquet'))),
        ('Parquet Snappy', lambda: pd.read_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_snappy.parquet'))),
        ('Parquet Gzip', lambda: pd.read_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_gzip.parquet'))),
        ('Parquet Zstd', lambda: pd.read_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_zstd.parquet'))),
        ('Parquet Brotli', lambda: pd.read_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_brotli.parquet'))),
        ('Parquet LZ4', lambda: pd.read_parquet(os.path.join(dir_path, f'SPY_{timestamp_type}_lz4.parquet'))),
        ('Feather', lambda: pd.read_feather(os.path.join(dir_path, f'SPY_{timestamp_type}.feather'))),
        ('HDF5', lambda: pd.read_hdf(os.path.join(dir_path, f'SPY_{timestamp_type}.hdf'), key='SPY')),
        ('Numpy NPZ', lambda: load_numpy_to_df(os.path.join(dir_path, f'SPY_{timestamp_type}.npz'))),
    ]
    
    for name, read_func in read_formats:
        try:
            start_time = time.time()
            df_loaded = read_func()
            read_time = time.time() - start_time
            
            # Update results
            for result in results:
                if result['format'] == name:
                    result['read_time'] = read_time
                    break
            
            print(f"{name:15} | Read: {read_time:.3f}s")
            
        except Exception as e:
            print(f"{name:15} | Error: {e}")
    
    return results

def load_numpy_to_df(file_path):
    """Helper function to load numpy file back to DataFrame"""
    data = np.load(file_path, allow_pickle=True)
    return pd.DataFrame(data['data'], columns=data['columns'])

# Main test execution
dir_path = os.path.join(settings.ABSOLUTE_DATA_DIR, 'bars', '1minute')
all_results = []

# Test with datetime objects (parsed dates)
print("=== TESTING WITH DATETIME TIMESTAMPS ===")
df_datetime = pd.read_csv(os.path.join(dir_path, 'SPY.csv'), parse_dates=['timestamp'])
df_datetime['timestamp'] = pd.to_datetime(df_datetime['timestamp'])
results_datetime = test_formats(df_datetime, dir_path, "datetime")
all_results.extend(results_datetime)

# Test with string timestamps (original)
print("\n=== TESTING WITH STRING TIMESTAMPS ===")
df_string = pd.read_csv(os.path.join(dir_path, 'SPY.csv'))  # Load without parsing dates
results_string = test_formats(df_string, dir_path, "string")
all_results.extend(results_string)

# Test with unix timestamps (converted to int64)
print("\n=== TESTING WITH UNIX TIMESTAMPS ===")
df_unix = pd.read_csv(os.path.join(dir_path, 'SPY.csv'))
df_unix['timestamp'] = pd.to_datetime(df_unix['timestamp']).astype('int64') // 10**9
results_unix = test_formats(df_unix, dir_path, "unix")
all_results.extend(results_unix)

# Test with optimized data types for disk storage
print("\n=== TESTING WITH OPTIMIZED DATA TYPES ===")
df_optimized = pd.read_csv(os.path.join(dir_path, 'SPY.csv'))

# Optimize data types for better compression and storage
df_optimized['timestamp'] = pd.to_datetime(df_optimized['timestamp']).astype('int64') // 10**9  # Unix timestamp
df_optimized['open'] = df_optimized['open'].astype('float32')      # 32-bit float instead of 64-bit
df_optimized['high'] = df_optimized['high'].astype('float32')
df_optimized['low'] = df_optimized['low'].astype('float32')
df_optimized['close'] = df_optimized['close'].astype('float32')
df_optimized['volume'] = df_optimized['volume'].astype('uint32')   # Unsigned 32-bit for volume
df_optimized['vwap'] = df_optimized['vwap'].astype('float32')
df_optimized['transactions'] = df_optimized['transactions'].astype('uint32')  # Unsigned 32-bit

# Show data type info
print("Original data types:")
print(df_datetime.dtypes)
print(f"\nOriginal memory usage: {df_datetime.memory_usage(deep=True).sum() / (1024**2):.1f} MB")

print("\nOptimized data types:")
print(df_optimized.dtypes)
print(f"Optimized memory usage: {df_optimized.memory_usage(deep=True).sum() / (1024**2):.1f} MB")

results_optimized = test_formats(df_optimized, dir_path, "optimized")
all_results.extend(results_optimized)

# Print comprehensive summary
print("\n=== COMPREHENSIVE SUMMARY ===")
print(f"{'Format':15} | {'Type':8} | {'Write':>8} | {'Read':>8} | {'Size':>8}")
print("-" * 60)
for result in all_results:
    if 'read_time' in result:
        print(f"{result['format']:15} | {result['timestamp_type']:8} | "
              f"{result['write_time']:7.3f}s | {result['read_time']:7.3f}s | "
              f"{result['file_size']:7.1f}MB")

# Print best performers by category
print("\n=== BEST PERFORMERS ===")
# Best for file size
best_size = min([r for r in all_results if 'read_time' in r], key=lambda x: x['file_size'])
print(f"Smallest file: {best_size['format']} ({best_size['timestamp_type']}) - {best_size['file_size']:.1f}MB")

# Best for read speed
best_read = min([r for r in all_results if 'read_time' in r], key=lambda x: x['read_time'])
print(f"Fastest read: {best_read['format']} ({best_read['timestamp_type']}) - {best_read['read_time']:.3f}s")

# Best balance (read speed / file size ratio)
for result in all_results:
    if 'read_time' in result:
        result['efficiency'] = result['read_time'] / result['file_size']
best_balance = min([r for r in all_results if 'read_time' in r], key=lambda x: x['efficiency'])
print(f"Best balance: {best_balance['format']} ({best_balance['timestamp_type']}) - "
      f"{best_balance['read_time']:.3f}s / {best_balance['file_size']:.1f}MB")

=== TESTING WITH DATETIME TIMESTAMPS ===

--- Writing Performance (datetime timestamps) ---
CSV             | Write: 15.965s | Size: 260.2MB
Parquet None    | Write: 0.759s | Size: 131.2MB
Parquet Snappy  | Write: 0.870s | Size: 103.2MB
Parquet Gzip    | Write: 6.708s | Size: 74.0MB
Parquet Zstd    | Write: 0.960s | Size: 90.1MB
Parquet Brotli  | Write: 10.537s | Size: 68.9MB
Parquet LZ4     | Write: 0.865s | Size: 104.9MB
Feather         | Write: 0.236s | Size: 125.1MB
HDF5            | Write: 0.344s | Size: 302.3MB
Numpy NPZ       | Write: 19.961s | Size: 91.3MB

--- Reading Performance (datetime timestamps) ---
CSV             | Read: 2.145s
Parquet None    | Read: 0.119s
Parquet Snappy  | Read: 0.112s
Parquet Gzip    | Read: 0.112s
Parquet Zstd    | Read: 0.085s
Parquet Brotli  | Read: 0.151s
Parquet LZ4     | Read: 0.087s
Feather         | Read: 0.082s
HDF5            | Read: 0.202s
Numpy NPZ       | Read: 12.719s

=== TESTING WITH STRING TIMESTAMPS ===

--- Writing Performance (s

In [50]:
# Data integrity test - verify that parquet saves and loads data correctly
print("=== DATA INTEGRITY TEST ===")

# Read original CSV data
df_original = pd.read_csv(os.path.join(dir_path, 'SPY.csv'))
print(f"Original CSV shape: {df_original.shape}")
print(f"Original data types:\n{df_original.dtypes}")

# Optimize data types (same as before)
df_optimized = df_original.copy()
df_optimized['timestamp'] = pd.to_datetime(df_optimized['timestamp']).astype('int64') // 10**9  # Unix timestamp
df_optimized['open'] = df_optimized['open'].astype('float32')      # 32-bit float instead of 64-bit
df_optimized['high'] = df_optimized['high'].astype('float32')
df_optimized['low'] = df_optimized['low'].astype('float32')
df_optimized['close'] = df_optimized['close'].astype('float32')
df_optimized['volume'] = df_optimized['volume'].astype('uint32')   # Unsigned 32-bit for volume
df_optimized['vwap'] = df_optimized['vwap'].astype('float32')
df_optimized['transactions'] = df_optimized['transactions'].astype('uint32')  # Unsigned 32-bit

print(f"\nOptimized data types:\n{df_optimized.dtypes}")

# Save to parquet with zstd compression
test_parquet_path = os.path.join(dir_path, 'SPY_integrity_test.parquet')
df_optimized.to_parquet(test_parquet_path, engine='pyarrow', compression='zstd')
print(f"Saved to: {test_parquet_path}")

# Read back from parquet
df_loaded = pd.read_parquet(test_parquet_path)
print(f"Loaded parquet shape: {df_loaded.shape}")
print(f"Loaded data types:\n{df_loaded.dtypes}")

# Check if shapes match
if df_optimized.shape != df_loaded.shape:
    print(f"ERROR: Shape mismatch! Original: {df_optimized.shape}, Loaded: {df_loaded.shape}")
else:
    print("✓ Shapes match")

# Check if column names match
if list(df_optimized.columns) != list(df_loaded.columns):
    print(f"ERROR: Column names mismatch!")
    print(f"Original columns: {list(df_optimized.columns)}")
    print(f"Loaded columns: {list(df_loaded.columns)}")
else:
    print("✓ Column names match")

# Check if data types match
dtype_match = True
for col in df_optimized.columns:
    if df_optimized[col].dtype != df_loaded[col].dtype:
        print(f"ERROR: Data type mismatch for column '{col}'!")
        print(f"Original: {df_optimized[col].dtype}, Loaded: {df_loaded[col].dtype}")
        dtype_match = False

if dtype_match:
    print("✓ Data types match")

# Check if values match (row by row comparison)
print("\nChecking values row by row...")
mismatch_found = False
total_rows = len(df_optimized)

for idx in range(total_rows):
    if idx % 100000 == 0:  # Progress indicator
        print(f"Checking row {idx:,} of {total_rows:,}")
    
    for col in df_optimized.columns:
        original_val = df_optimized.iloc[idx][col]
        loaded_val = df_loaded.iloc[idx][col]
        
        # Handle NaN values
        if pd.isna(original_val) and pd.isna(loaded_val):
            continue
        
        # Handle float precision issues
        if isinstance(original_val, (float, np.floating)) and isinstance(loaded_val, (float, np.floating)):
            if not np.isclose(original_val, loaded_val, rtol=1e-6, atol=1e-8):
                print(f"ERROR: Value mismatch at row {idx}, column '{col}'!")
                print(f"Original: {original_val} (type: {type(original_val)})")
                print(f"Loaded: {loaded_val} (type: {type(loaded_val)})")
                print(f"Difference: {abs(original_val - loaded_val)}")
                mismatch_found = True
                break
        else:
            # Exact match for integers and other types
            if original_val != loaded_val:
                print(f"ERROR: Value mismatch at row {idx}, column '{col}'!")
                print(f"Original: {original_val} (type: {type(original_val)})")
                print(f"Loaded: {loaded_val} (type: {type(loaded_val)})")
                mismatch_found = True
                break
    
    if mismatch_found:
        break

if not mismatch_found:
    print(f"✓ All {total_rows:,} rows checked - no value mismatches found!")
    print("✓ DATA INTEGRITY VERIFIED - Parquet format preserves data correctly")
else:
    print("✗ DATA INTEGRITY FAILED - Found value mismatches")

# Additional statistics comparison
print("\n=== STATISTICAL COMPARISON ===")
numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'vwap', 'transactions']
for col in numeric_cols:
    if col in df_optimized.columns:
        orig_sum = df_optimized[col].sum()
        loaded_sum = df_loaded[col].sum()
        print(f"{col:12} - Original sum: {orig_sum:15.2f}, Loaded sum: {loaded_sum:15.2f}, Diff: {abs(orig_sum - loaded_sum):.2e}")

# Clean up test file
os.remove(test_parquet_path)
print(f"\nTest file removed: {test_parquet_path}")

=== DATA INTEGRITY TEST ===
Original CSV shape: (3961817, 9)
Original data types:
timestamp        object
open            float64
high            float64
low             float64
close           float64
volume          float64
vwap            float64
transactions      int64
otc             float64
dtype: object

Optimized data types:
timestamp         int64
open            float32
high            float32
low             float32
close           float32
volume           uint32
vwap            float32
transactions     uint32
otc             float64
dtype: object
Saved to: /home/stan/src/download-polygon-aggregates/../polygon-data/bars/1minute/SPY_integrity_test.parquet
Loaded parquet shape: (3961817, 9)
Loaded data types:
timestamp         int64
open            float32
high            float32
low             float32
close           float32
volume           uint32
vwap            float32
transactions     uint32
otc             float64
dtype: object
✓ Shapes match
✓ Column names match
✓ Data

KeyboardInterrupt: 