# Fractal Time Series Compression: Interactive Demo

This notebook demonstrates three fractal-based compression methods for time series data:

1. **Iterated Function Systems (IFS)** - Uses contractive transformations
2. **Fractal Coding** - Block-based compression with self-similarity
3. **Fractal Interpolation** - Advanced decompression using FIFs

We'll explore different types of time series and compare compression performance with detailed visualizations.

In [None]:
# Setup and imports
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import warnings
from typing import Dict, Tuple, List
import time

# Configure plotting
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 10
sns.set_palette("husl")
warnings.filterwarnings('ignore')

# Import our fractal compression modules
import sys
import os
sys.path.append('./src')

from data.generator import TimeSeriesGenerator
from data.loader import TimeSeriesLoader
from compression.ifs_compression import IFSCompressor
from compression.fractal_coding import FractalCodingCompressor
from decompression.fractal_interpolation import FractalInterpolationDecompressor
from utils.metrics import CompressionMetrics
from utils.plotting import CompressionVisualizer

print("✅ All modules imported successfully!")
print("📊 Ready to demonstrate fractal compression methods")

## 1. Data Generation

Let's start by generating different types of time series to test our compression methods.

In [None]:
def generate_test_datasets(n_points=1000):
    """Generate various types of time series for testing."""
    datasets = {}
    
    # 1. Simple sine wave
    t, y = TimeSeriesGenerator.sine_wave(n_points=n_points, frequency=2.0, amplitude=1.0, noise_level=0.05)
    datasets['Simple Sine'] = (t, y)
    
    # 2. Multi-component signal
    components = [
        {'type': 'sine', 'frequency': 1.0, 'amplitude': 1.0},
        {'type': 'sine', 'frequency': 3.0, 'amplitude': 0.5},
        {'type': 'sine', 'frequency': 7.0, 'amplitude': 0.3},
        {'type': 'sine', 'frequency': 15.0, 'amplitude': 0.1}
    ]
    t, y = TimeSeriesGenerator.multi_component_series(n_points=n_points, components=components)
    datasets['Multi-Sine'] = (t, y)
    
    # 3. Fractal Brownian motion
    t, y = TimeSeriesGenerator.fractal_brownian_motion(n_points=n_points, hurst=0.7, scale=1.0)
    datasets['Fractal Brownian'] = (t, y)
    
    # 4. Stock price simulation
    t, y = TimeSeriesGenerator.stock_price_simulation(n_points=n_points, volatility=0.2)
    datasets['Stock Price'] = (t, y)
    
    # 5. Random walk
    t, y = TimeSeriesGenerator.random_walk(n_points=n_points, step_size=0.1, drift=0.01)
    datasets['Random Walk'] = (t, y)
    
    # Normalize all datasets
    for name, (t, y) in datasets.items():
        t_norm, y_norm = TimeSeriesLoader.preprocess_data(t, y, normalize=True)
        datasets[name] = (t_norm, y_norm)
    
    return datasets

# Generate test datasets
print("🔄 Generating test datasets...")
datasets = generate_test_datasets(n_points=800)
print(f"✅ Generated {len(datasets)} different time series types")

# Display dataset characteristics
for name, (t, y) in datasets.items():
    print(f"   {name}: {len(y)} points, range [{np.min(y):.3f}, {np.max(y):.3f}], std={np.std(y):.3f}")

In [None]:
# Visualize all generated datasets
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
axes = axes.flatten()

for i, (name, (t, y)) in enumerate(datasets.items()):
    if i < len(axes):
        axes[i].plot(t, y, linewidth=1.5, alpha=0.8)
        axes[i].set_title(f'{name} Time Series', fontsize=12, fontweight='bold')
        axes[i].set_xlabel('Time')
        axes[i].set_ylabel('Normalized Value')
        axes[i].grid(True, alpha=0.3)
        
        # Add statistics
        mean_val = np.mean(y)
        std_val = np.std(y)
        axes[i].text(0.02, 0.95, f'μ={mean_val:.3f}\nσ={std_val:.3f}', 
                    transform=axes[i].transAxes, verticalalignment='top',
                    bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))

# Remove empty subplot
if len(datasets) < len(axes):
    axes[-1].remove()

plt.suptitle('Generated Time Series Datasets for Compression Testing', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

## 2. Compression Method Setup

Now let's set up our three compression methods with optimized parameters.

In [None]:
# Initialize compression methods
compressors = {
    'IFS': IFSCompressor(
        n_transformations=4,
        max_iterations=100,
        contractivity_bound=0.9
    ),
    'Fractal Coding': FractalCodingCompressor(
        range_block_size=8,
        domain_block_size=16,
        overlap_factor=0.5
    )
}

# Initialize fractal interpolation decompressor
fif_decompressor = FractalInterpolationDecompressor()

# Initialize metrics calculator and visualizer
metrics_calc = CompressionMetrics()
visualizer = CompressionVisualizer()

print("🔧 Compression methods initialized:")
for name, compressor in compressors.items():
    print(f"   ✅ {name}: {compressor.__class__.__name__}")
print(f"   ✅ Fractal Interpolation Decompressor: {fif_decompressor.__class__.__name__}")

## 3. Single Dataset Detailed Analysis

Let's start with a detailed analysis of one dataset to show the complete compression pipeline.

In [None]:
# Choose the multi-sine dataset for detailed analysis
selected_dataset = 'Multi-Sine'
time_data, value_data = datasets[selected_dataset]

print(f"🔍 Detailed Analysis: {selected_dataset}")
print(f"   Data points: {len(value_data)}")
print(f"   Original size: {len(value_data) * 8} bytes (assuming 8 bytes per float)")
print(f"   Time range: [{time_data[0]:.3f}, {time_data[-1]:.3f}]")
print(f"   Value range: [{np.min(value_data):.3f}, {np.max(value_data):.3f}]")

In [None]:
def compress_and_analyze(method_name, compressor, time_data, value_data, use_fif=False):
    """Compress data and analyze results."""
    print(f"\n🔄 Testing {method_name}...")
    
    try:
        # Compression
        start_time = time.time()
        comp_result = compressor.compress(time_data, value_data)
        comp_time = time.time() - start_time
        
        print(f"   ✅ Compression completed in {comp_time:.3f}s")
        print(f"   📊 Compression ratio: {comp_result.compression_ratio:.2f}x")
        print(f"   💾 Space savings: {comp_result.space_savings:.1f}%")
        
        # Decompression - try both methods
        results = {}
        
        # Native decompression
        start_time = time.time()
        native_decomp = compressor.decompress(comp_result)
        native_time = time.time() - start_time
        
        native_metrics = metrics_calc.comprehensive_evaluation(
            value_data, native_decomp.reconstructed_data,
            comp_result.original_size, comp_result.compressed_size,
            comp_result.compression_time, native_decomp.decompression_time
        )
        
        results['Native'] = {
            'reconstructed': native_decomp.reconstructed_data,
            'metrics': native_metrics,
            'time': native_time
        }
        
        print(f"   🔄 Native decompression: {native_time:.3f}s, correlation={native_metrics['pearson_correlation']:.4f}")
        
        # Fractal Interpolation decompression
        if use_fif:
            try:
                start_time = time.time()
                fif_decomp = fif_decompressor.decompress_with_fif(
                    comp_result.compressed_data, 
                    len(value_data),
                    (time_data[0], time_data[-1])
                )
                fif_time = time.time() - start_time
                
                fif_metrics = metrics_calc.comprehensive_evaluation(
                    value_data, fif_decomp.reconstructed_data,
                    comp_result.original_size, comp_result.compressed_size,
                    comp_result.compression_time, fif_decomp.decompression_time
                )
                
                results['FIF'] = {
                    'reconstructed': fif_decomp.reconstructed_data,
                    'metrics': fif_metrics,
                    'time': fif_time
                }
                
                print(f"   🔄 FIF decompression: {fif_time:.3f}s, correlation={fif_metrics['pearson_correlation']:.4f}")
                
            except Exception as e:
                print(f"   ⚠️  FIF decompression failed: {e}")
        
        return comp_result, results
        
    except Exception as e:
        print(f"   ❌ {method_name} failed: {e}")
        return None, None

# Test all compression methods on the selected dataset
compression_results = {}

for method_name, compressor in compressors.items():
    comp_result, decomp_results = compress_and_analyze(
        method_name, compressor, time_data, value_data, use_fif=True
    )
    if comp_result is not None:
        compression_results[method_name] = {
            'compression': comp_result,
            'decompression': decomp_results
        }

## 4. Detailed Before/After Comparison Visualizations

In [None]:
# Create comprehensive before/after comparison
def plot_detailed_comparison(time_data, original_data, compression_results, dataset_name):
    """Create detailed before/after comparison plots."""
    
    n_methods = len(compression_results)
    fig = plt.figure(figsize=(20, 15))
    
    # Create grid layout
    gs = fig.add_gridspec(4, 3, hspace=0.3, wspace=0.3)
    
    # Original signal (top row, spanning all columns)
    ax_orig = fig.add_subplot(gs[0, :])
    ax_orig.plot(time_data, original_data, 'b-', linewidth=2, label='Original Signal')
    ax_orig.set_title(f'Original {dataset_name} Time Series', fontsize=16, fontweight='bold')
    ax_orig.set_ylabel('Normalized Value', fontsize=12)
    ax_orig.grid(True, alpha=0.3)
    ax_orig.legend(fontsize=12)
    
    # Add original signal statistics
    orig_stats = f'Length: {len(original_data)}\nMean: {np.mean(original_data):.4f}\nStd: {np.std(original_data):.4f}'
    ax_orig.text(0.02, 0.95, orig_stats, transform=ax_orig.transAxes, 
                verticalalignment='top', fontsize=10,
                bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8))
    
    # Compression results (remaining rows)
    method_colors = ['red', 'green', 'orange', 'purple']
    
    row = 1
    for method_idx, (method_name, results) in enumerate(compression_results.items()):
        color = method_colors[method_idx % len(method_colors)]
        
        # Reconstruction comparison
        ax_recon = fig.add_subplot(gs[row, 0])
        ax_recon.plot(time_data, original_data, 'b-', alpha=0.7, linewidth=1.5, label='Original')
        
        # Plot native reconstruction
        if 'Native' in results['decompression']:
            native_recon = results['decompression']['Native']['reconstructed']
            ax_recon.plot(time_data, native_recon, '--', color=color, 
                         linewidth=2, alpha=0.8, label=f'{method_name} (Native)')
        
        # Plot FIF reconstruction if available
        if 'FIF' in results['decompression']:
            fif_recon = results['decompression']['FIF']['reconstructed']
            ax_recon.plot(time_data, fif_recon, ':', color=color, 
                         linewidth=2, alpha=0.8, label=f'{method_name} (FIF)')
        
        ax_recon.set_title(f'{method_name} Reconstruction', fontsize=12, fontweight='bold')
        ax_recon.set_ylabel('Value', fontsize=10)
        ax_recon.grid(True, alpha=0.3)
        ax_recon.legend(fontsize=9)
        
        # Error plot
        ax_error = fig.add_subplot(gs[row, 1])
        
        if 'Native' in results['decompression']:
            native_recon = results['decompression']['Native']['reconstructed']
            error = original_data - native_recon
            ax_error.plot(time_data, error, '-', color=color, linewidth=1.5, alpha=0.8, label='Native Error')
        
        if 'FIF' in results['decompression']:
            fif_recon = results['decompression']['FIF']['reconstructed']
            fif_error = original_data - fif_recon
            ax_error.plot(time_data, fif_error, '--', color=color, linewidth=1.5, alpha=0.8, label='FIF Error')
        
        ax_error.axhline(y=0, color='k', linestyle='-', alpha=0.3)
        ax_error.set_title(f'{method_name} Reconstruction Error', fontsize=12, fontweight='bold')
        ax_error.set_ylabel('Error', fontsize=10)
        ax_error.grid(True, alpha=0.3)
        ax_error.legend(fontsize=9)
        
        # Metrics summary
        ax_metrics = fig.add_subplot(gs[row, 2])
        ax_metrics.axis('off')
        
        # Prepare metrics text
        comp_metrics = results['compression']
        
        metrics_text = f"COMPRESSION METRICS\n\n"
        metrics_text += f"Ratio: {comp_metrics.compression_ratio:.2f}x\n"
        metrics_text += f"Space Savings: {comp_metrics.space_savings:.1f}%\n"
        metrics_text += f"Comp. Time: {comp_metrics.compression_time:.3f}s\n\n"
        
        if 'Native' in results['decompression']:
            native_metrics = results['decompression']['Native']['metrics']
            metrics_text += f"NATIVE DECOMPRESSION\n"
            metrics_text += f"RMSE: {native_metrics['rmse']:.4f}\n"
            metrics_text += f"Correlation: {native_metrics['pearson_correlation']:.4f}\n"
            metrics_text += f"SNR: {native_metrics['snr_db']:.1f} dB\n"
            metrics_text += f"SSIM: {native_metrics['ssim']:.4f}\n"
            metrics_text += f"Time: {results['decompression']['Native']['time']:.3f}s\n\n"
        
        if 'FIF' in results['decompression']:
            fif_metrics = results['decompression']['FIF']['metrics']
            metrics_text += f"FIF DECOMPRESSION\n"
            metrics_text += f"RMSE: {fif_metrics['rmse']:.4f}\n"
            metrics_text += f"Correlation: {fif_metrics['pearson_correlation']:.4f}\n"
            metrics_text += f"SNR: {fif_metrics['snr_db']:.1f} dB\n"
            metrics_text += f"SSIM: {fif_metrics['ssim']:.4f}\n"
            metrics_text += f"Time: {results['decompression']['FIF']['time']:.3f}s"
        
        ax_metrics.text(0.05, 0.95, metrics_text, transform=ax_metrics.transAxes,
                       verticalalignment='top', fontfamily='monospace', fontsize=9,
                       bbox=dict(boxstyle='round', facecolor='lightgray', alpha=0.8))
        
        row += 1
    
    plt.suptitle(f'Detailed Compression Analysis: {dataset_name}', fontsize=18, fontweight='bold')
    return fig

# Create the detailed comparison plot
if compression_results:
    fig = plot_detailed_comparison(time_data, value_data, compression_results, selected_dataset)
    plt.show()
else:
    print("❌ No compression results to display")

## 5. Frequency Domain Analysis

In [None]:
# Frequency domain analysis for the selected dataset
def plot_frequency_analysis(time_data, original_data, compression_results):
    """Plot frequency domain analysis."""
    
    fig, axes = plt.subplots(2, 2, figsize=(16, 10))
    
    # Compute FFT of original signal
    fft_original = np.fft.fft(original_data)
    freqs = np.fft.fftfreq(len(original_data), d=time_data[1]-time_data[0])
    
    # Only plot positive frequencies
    pos_mask = freqs > 0
    freqs_pos = freqs[pos_mask]
    fft_orig_pos = fft_original[pos_mask]
    
    # Plot 1: Original magnitude spectrum
    axes[0,0].loglog(freqs_pos, np.abs(fft_orig_pos), 'b-', linewidth=2, label='Original')
    axes[0,0].set_title('Magnitude Spectrum Comparison', fontsize=12, fontweight='bold')
    axes[0,0].set_xlabel('Frequency')
    axes[0,0].set_ylabel('Magnitude')
    axes[0,0].grid(True, alpha=0.3)
    
    # Plot 2: Phase spectrum
    axes[0,1].plot(freqs_pos, np.angle(fft_orig_pos), 'b-', linewidth=2, alpha=0.7, label='Original')
    axes[0,1].set_title('Phase Spectrum Comparison', fontsize=12, fontweight='bold')
    axes[0,1].set_xlabel('Frequency')
    axes[0,1].set_ylabel('Phase (radians)')
    axes[0,1].grid(True, alpha=0.3)
    
    # Plot reconstructed signals' frequency content
    colors = ['red', 'green', 'orange', 'purple']
    
    for method_idx, (method_name, results) in enumerate(compression_results.items()):
        color = colors[method_idx % len(colors)]
        
        if 'Native' in results['decompression']:
            recon_data = results['decompression']['Native']['reconstructed']
            fft_recon = np.fft.fft(recon_data)
            fft_recon_pos = fft_recon[pos_mask]
            
            # Magnitude spectrum
            axes[0,0].loglog(freqs_pos, np.abs(fft_recon_pos), '--', 
                           color=color, linewidth=2, alpha=0.8, label=f'{method_name}')
            
            # Phase spectrum
            axes[0,1].plot(freqs_pos, np.angle(fft_recon_pos), '--', 
                          color=color, linewidth=2, alpha=0.8, label=f'{method_name}')
    
    axes[0,0].legend()
    axes[0,1].legend()
    
    # Plot 3: Magnitude error
    for method_idx, (method_name, results) in enumerate(compression_results.items()):
        color = colors[method_idx % len(colors)]
        
        if 'Native' in results['decompression']:
            recon_data = results['decompression']['Native']['reconstructed']
            fft_recon = np.fft.fft(recon_data)
            fft_recon_pos = fft_recon[pos_mask]
            
            mag_error = np.abs(np.abs(fft_orig_pos) - np.abs(fft_recon_pos))
            axes[1,0].semilogy(freqs_pos, mag_error, '-', color=color, 
                             linewidth=2, alpha=0.8, label=f'{method_name}')
    
    axes[1,0].set_title('Magnitude Reconstruction Error', fontsize=12, fontweight='bold')
    axes[1,0].set_xlabel('Frequency')
    axes[1,0].set_ylabel('Magnitude Error')
    axes[1,0].grid(True, alpha=0.3)
    axes[1,0].legend()
    
    # Plot 4: Frequency domain metrics
    axes[1,1].axis('off')
    
    freq_metrics_text = "FREQUENCY DOMAIN METRICS\n\n"
    
    for method_name, results in compression_results.items():
        if 'Native' in results['decompression']:
            recon_data = results['decompression']['Native']['reconstructed']
            freq_metrics = metrics_calc.frequency_domain_error(original_data, recon_data)
            
            freq_metrics_text += f"{method_name}:\n"
            freq_metrics_text += f"  Mag Error: {freq_metrics['magnitude_error']:.4f}\n"
            freq_metrics_text += f"  Phase Error: {freq_metrics['phase_error']:.4f}\n"
            freq_metrics_text += f"  Spectral Corr: {freq_metrics['spectral_correlation']:.4f}\n\n"
    
    axes[1,1].text(0.05, 0.95, freq_metrics_text, transform=axes[1,1].transAxes,
                  verticalalignment='top', fontfamily='monospace', fontsize=10,
                  bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.8))
    
    plt.suptitle(f'Frequency Domain Analysis: {selected_dataset}', fontsize=16, fontweight='bold')
    plt.tight_layout()
    return fig

if compression_results:
    freq_fig = plot_frequency_analysis(time_data, value_data, compression_results)
    plt.show()

## 6. Comprehensive Dataset Comparison

Now let's test all compression methods on all datasets and create a comprehensive comparison.

In [None]:
# Test all methods on all datasets
def test_all_combinations():
    """Test all compression methods on all datasets."""
    
    all_results = {}
    
    for dataset_name, (t, y) in datasets.items():
        print(f"\n🔄 Testing dataset: {dataset_name}")
        dataset_results = {}
        
        for method_name, compressor in compressors.items():
            try:
                print(f"   Testing {method_name}...", end=" ")
                
                # Compress
                comp_result = compressor.compress(t, y)
                
                # Decompress (native method)
                decomp_result = compressor.decompress(comp_result)
                
                # Calculate metrics
                metrics = metrics_calc.comprehensive_evaluation(
                    y, decomp_result.reconstructed_data,
                    comp_result.original_size, comp_result.compressed_size,
                    comp_result.compression_time, decomp_result.decompression_time
                )
                
                dataset_results[method_name] = {
                    'metrics': metrics,
                    'reconstructed': decomp_result.reconstructed_data,
                    'original': y
                }
                
                print(f"✅ Ratio: {metrics['compression_ratio']:.2f}x, Corr: {metrics['pearson_correlation']:.3f}")
                
            except Exception as e:
                print(f"❌ Failed: {str(e)[:50]}...")
                dataset_results[method_name] = {'error': str(e)}
        
        all_results[dataset_name] = dataset_results
    
    return all_results

# Run comprehensive testing
print("🚀 Starting comprehensive testing of all methods on all datasets...")
all_results = test_all_combinations()
print("\n✅ Comprehensive testing completed!")

In [None]:
# Create summary comparison table
def create_results_dataframe(all_results):
    """Create a pandas DataFrame summarizing all results."""
    
    rows = []
    
    for dataset_name, dataset_results in all_results.items():
        for method_name, method_results in dataset_results.items():
            if 'error' not in method_results:
                metrics = method_results['metrics']
                row = {
                    'Dataset': dataset_name,
                    'Method': method_name,
                    'Compression Ratio': metrics['compression_ratio'],
                    'Space Savings (%)': metrics['space_savings_percent'],
                    'RMSE': metrics['rmse'],
                    'Correlation': metrics['pearson_correlation'],
                    'SNR (dB)': metrics['snr_db'],
                    'PSNR (dB)': metrics['psnr_db'],
                    'SSIM': metrics['ssim'],
                    'Total Time (s)': metrics['total_time'],
                    'Fractal Dim Error': metrics.get('fractal_fractal_dimension_error', 0),
                    'Hurst Error': metrics.get('fractal_hurst_exponent_error', 0)
                }
                rows.append(row)
    
    return pd.DataFrame(rows)

# Create results DataFrame
results_df = create_results_dataframe(all_results)

# Display summary table
print("📊 COMPREHENSIVE RESULTS SUMMARY")
print("=" * 80)

# Show key metrics
display_columns = ['Dataset', 'Method', 'Compression Ratio', 'Correlation', 'RMSE', 'Total Time (s)']
summary_df = results_df[display_columns].round(4)
print(summary_df.to_string(index=False))

# Show best performers
print("\n🏆 BEST PERFORMERS:")
print("-" * 40)

for metric in ['Compression Ratio', 'Correlation', 'RMSE']:
    if metric == 'RMSE':
        best_row = results_df.loc[results_df[metric].idxmin()]
        print(f"Lowest {metric}: {best_row['Method']} on {best_row['Dataset']} ({best_row[metric]:.4f})")
    else:
        best_row = results_df.loc[results_df[metric].idxmax()]
        print(f"Best {metric}: {best_row['Method']} on {best_row['Dataset']} ({best_row[metric]:.4f})")

In [None]:
# Create comprehensive visualization of all results
def plot_comprehensive_comparison(results_df):
    """Create comprehensive comparison plots."""
    
    fig, axes = plt.subplots(2, 3, figsize=(20, 12))
    
    # Plot 1: Compression Ratio by Dataset and Method
    pivot_ratio = results_df.pivot(index='Dataset', columns='Method', values='Compression Ratio')
    sns.heatmap(pivot_ratio, annot=True, fmt='.2f', cmap='YlOrRd', ax=axes[0,0])
    axes[0,0].set_title('Compression Ratio by Dataset and Method', fontweight='bold')
    axes[0,0].set_ylabel('')
    
    # Plot 2: Correlation by Dataset and Method
    pivot_corr = results_df.pivot(index='Dataset', columns='Method', values='Correlation')
    sns.heatmap(pivot_corr, annot=True, fmt='.3f', cmap='RdYlGn', ax=axes[0,1], vmin=0, vmax=1)
    axes[0,1].set_title('Reconstruction Quality (Correlation)', fontweight='bold')
    axes[0,1].set_ylabel('')
    
    # Plot 3: Processing Time
    pivot_time = results_df.pivot(index='Dataset', columns='Method', values='Total Time (s)')
    sns.heatmap(pivot_time, annot=True, fmt='.3f', cmap='YlOrRd', ax=axes[0,2])
    axes[0,2].set_title('Total Processing Time (s)', fontweight='bold')
    axes[0,2].set_ylabel('')
    
    # Plot 4: Trade-off scatter plot
    colors = {'IFS': 'red', 'Fractal Coding': 'green'}
    for method in results_df['Method'].unique():
        method_data = results_df[results_df['Method'] == method]
        axes[1,0].scatter(method_data['Compression Ratio'], method_data['Correlation'], 
                         label=method, alpha=0.7, s=100, c=colors.get(method, 'blue'))
    
    axes[1,0].set_xlabel('Compression Ratio')
    axes[1,0].set_ylabel('Reconstruction Quality (Correlation)')
    axes[1,0].set_title('Quality vs Compression Trade-off', fontweight='bold')
    axes[1,0].legend()
    axes[1,0].grid(True, alpha=0.3)
    
    # Plot 5: Method comparison radar chart data preparation
    method_avg = results_df.groupby('Method').agg({
        'Compression Ratio': 'mean',
        'Correlation': 'mean',
        'SSIM': 'mean',
        'SNR (dB)': 'mean'
    }).reset_index()
    
    # Normalize metrics for radar chart (0-1 scale)
    metrics_to_plot = ['Compression Ratio', 'Correlation', 'SSIM', 'SNR (dB)']
    
    # Bar chart instead of radar for simplicity
    x_pos = np.arange(len(method_avg))
    width = 0.2
    
    for i, metric in enumerate(metrics_to_plot[:3]):  # Show first 3 metrics
        if metric == 'SNR (dB)':
            # Normalize SNR to 0-1 scale
            values = (method_avg[metric] - method_avg[metric].min()) / (method_avg[metric].max() - method_avg[metric].min())
        elif metric == 'Compression Ratio':
            # Normalize compression ratio
            values = method_avg[metric] / method_avg[metric].max()
        else:
            values = method_avg[metric]
        
        axes[1,1].bar(x_pos + i*width, values, width, label=metric, alpha=0.8)
    
    axes[1,1].set_xlabel('Method')
    axes[1,1].set_ylabel('Normalized Score')
    axes[1,1].set_title('Average Performance Comparison', fontweight='bold')
    axes[1,1].set_xticks(x_pos + width)
    axes[1,1].set_xticklabels(method_avg['Method'])
    axes[1,1].legend()
    axes[1,1].grid(True, alpha=0.3)
    
    # Plot 6: Dataset difficulty ranking
    dataset_avg = results_df.groupby('Dataset').agg({
        'Correlation': 'mean',
        'Compression Ratio': 'mean'
    }).reset_index()
    
    dataset_avg = dataset_avg.sort_values('Correlation', ascending=False)
    
    bars1 = axes[1,2].bar(range(len(dataset_avg)), dataset_avg['Correlation'], 
                         alpha=0.7, label='Avg Correlation', color='skyblue')
    axes[1,2].set_ylabel('Average Correlation', color='blue')
    axes[1,2].tick_params(axis='y', labelcolor='blue')
    
    # Secondary y-axis for compression ratio
    ax2 = axes[1,2].twinx()
    bars2 = ax2.bar(range(len(dataset_avg)), dataset_avg['Compression Ratio'], 
                   alpha=0.7, label='Avg Compression Ratio', color='orange', width=0.6)
    ax2.set_ylabel('Average Compression Ratio', color='orange')
    ax2.tick_params(axis='y', labelcolor='orange')
    
    axes[1,2].set_xlabel('Dataset')
    axes[1,2].set_title('Dataset Compression Difficulty', fontweight='bold')
    axes[1,2].set_xticks(range(len(dataset_avg)))
    axes[1,2].set_xticklabels(dataset_avg['Dataset'], rotation=45, ha='right')
    
    plt.suptitle('Comprehensive Fractal Compression Analysis', fontsize=18, fontweight='bold')
    plt.tight_layout()
    
    return fig

# Create comprehensive comparison plot
if not results_df.empty:
    comp_fig = plot_comprehensive_comparison(results_df)
    plt.show()
else:
    print("❌ No results to plot")

## 7. Fractal Properties Analysis

Let's examine how well our compression methods preserve fractal properties.

In [None]:
# Analyze fractal properties preservation
def analyze_fractal_properties(all_results):
    """Analyze how well fractal properties are preserved."""
    
    fig, axes = plt.subplots(2, 2, figsize=(16, 10))
    
    fractal_data = []
    
    for dataset_name, dataset_results in all_results.items():
        for method_name, method_results in dataset_results.items():
            if 'error' not in method_results and 'metrics' in method_results:
                metrics = method_results['metrics']
                
                # Extract fractal metrics
                fractal_info = {
                    'Dataset': dataset_name,
                    'Method': method_name,
                    'Original FD': metrics.get('fractal_fractal_dimension_original', 1.0),
                    'Reconstructed FD': metrics.get('fractal_fractal_dimension_reconstructed', 1.0),
                    'FD Error': metrics.get('fractal_fractal_dimension_error', 0.0),
                    'Original Hurst': metrics.get('fractal_hurst_exponent_original', 0.5),
                    'Reconstructed Hurst': metrics.get('fractal_hurst_exponent_reconstructed', 0.5),
                    'Hurst Error': metrics.get('fractal_hurst_exponent_error', 0.0)
                }
                fractal_data.append(fractal_info)
    
    if fractal_data:
        fractal_df = pd.DataFrame(fractal_data)
        
        # Plot 1: Fractal Dimension preservation
        for method in fractal_df['Method'].unique():
            method_data = fractal_df[fractal_df['Method'] == method]
            axes[0,0].scatter(method_data['Original FD'], method_data['Reconstructed FD'], 
                            label=method, alpha=0.7, s=100)
        
        # Perfect correlation line
        min_fd = min(fractal_df['Original FD'].min(), fractal_df['Reconstructed FD'].min())
        max_fd = max(fractal_df['Original FD'].max(), fractal_df['Reconstructed FD'].max())
        axes[0,0].plot([min_fd, max_fd], [min_fd, max_fd], 'r--', alpha=0.5, label='Perfect Preservation')
        
        axes[0,0].set_xlabel('Original Fractal Dimension')
        axes[0,0].set_ylabel('Reconstructed Fractal Dimension')
        axes[0,0].set_title('Fractal Dimension Preservation', fontweight='bold')
        axes[0,0].legend()
        axes[0,0].grid(True, alpha=0.3)
        
        # Plot 2: Hurst Exponent preservation
        for method in fractal_df['Method'].unique():
            method_data = fractal_df[fractal_df['Method'] == method]
            axes[0,1].scatter(method_data['Original Hurst'], method_data['Reconstructed Hurst'], 
                            label=method, alpha=0.7, s=100)
        
        # Perfect correlation line
        axes[0,1].plot([0, 1], [0, 1], 'r--', alpha=0.5, label='Perfect Preservation')
        
        axes[0,1].set_xlabel('Original Hurst Exponent')
        axes[0,1].set_ylabel('Reconstructed Hurst Exponent')
        axes[0,1].set_title('Hurst Exponent Preservation', fontweight='bold')
        axes[0,1].legend()
        axes[0,1].grid(True, alpha=0.3)
        
        # Plot 3: Error distributions
        methods = fractal_df['Method'].unique()
        x_pos = np.arange(len(methods))
        
        fd_errors = [fractal_df[fractal_df['Method'] == method]['FD Error'].mean() for method in methods]
        hurst_errors = [fractal_df[fractal_df['Method'] == method]['Hurst Error'].mean() for method in methods]
        
        width = 0.35
        axes[1,0].bar(x_pos - width/2, fd_errors, width, label='Fractal Dimension Error', alpha=0.8)
        axes[1,0].bar(x_pos + width/2, hurst_errors, width, label='Hurst Exponent Error', alpha=0.8)
        
        axes[1,0].set_xlabel('Method')
        axes[1,0].set_ylabel('Average Error')
        axes[1,0].set_title('Average Fractal Property Errors', fontweight='bold')
        axes[1,0].set_xticks(x_pos)
        axes[1,0].set_xticklabels(methods)
        axes[1,0].legend()
        axes[1,0].grid(True, alpha=0.3)
        
        # Plot 4: Dataset-wise fractal preservation
        dataset_fractal = fractal_df.groupby('Dataset').agg({
            'FD Error': 'mean',
            'Hurst Error': 'mean'
        }).reset_index()
        
        x_pos = np.arange(len(dataset_fractal))
        axes[1,1].bar(x_pos - width/2, dataset_fractal['FD Error'], width, 
                     label='Avg FD Error', alpha=0.8)
        axes[1,1].bar(x_pos + width/2, dataset_fractal['Hurst Error'], width, 
                     label='Avg Hurst Error', alpha=0.8)
        
        axes[1,1].set_xlabel('Dataset')
        axes[1,1].set_ylabel('Average Error')
        axes[1,1].set_title('Fractal Property Preservation by Dataset', fontweight='bold')
        axes[1,1].set_xticks(x_pos)
        axes[1,1].set_xticklabels(dataset_fractal['Dataset'], rotation=45, ha='right')
        axes[1,1].legend()
        axes[1,1].grid(True, alpha=0.3)
        
        plt.suptitle('Fractal Properties Preservation Analysis', fontsize=16, fontweight='bold')
        plt.tight_layout()
        
        return fig, fractal_df
    else:
        print("❌ No fractal data available for analysis")
        return None, None

# Analyze fractal properties
fractal_fig, fractal_df = analyze_fractal_properties(all_results)
if fractal_fig:
    plt.show()
    
    print("\n📊 FRACTAL PROPERTIES SUMMARY")
    print("=" * 50)
    if fractal_df is not None and not fractal_df.empty:
        summary_fractal = fractal_df.groupby('Method').agg({
            'FD Error': ['mean', 'std'],
            'Hurst Error': ['mean', 'std']
        }).round(4)
        print(summary_fractal)

## 8. Interactive Parameter Exploration

Let's explore how different parameters affect compression performance.

In [None]:
# Parameter sensitivity analysis
def parameter_sensitivity_analysis():
    """Analyze sensitivity to different parameters."""
    
    # Use the multi-sine dataset for analysis
    time_data, value_data = datasets['Multi-Sine']
    
    # IFS parameter sensitivity
    print("🔄 IFS Parameter Sensitivity Analysis...")
    ifs_results = {}
    
    n_transform_values = [2, 3, 4, 5, 6]
    for n_transforms in n_transform_values:
        try:
            compressor = IFSCompressor(n_transformations=n_transforms, max_iterations=50)
            comp_result = compressor.compress(time_data, value_data)
            decomp_result = compressor.decompress(comp_result)
            
            correlation = metrics_calc.pearson_correlation(value_data, decomp_result.reconstructed_data)
            
            ifs_results[n_transforms] = {
                'compression_ratio': comp_result.compression_ratio,
                'correlation': correlation,
                'time': comp_result.compression_time + decomp_result.decompression_time
            }
            
            print(f"   {n_transforms} transforms: ratio={comp_result.compression_ratio:.2f}, corr={correlation:.3f}")
            
        except Exception as e:
            print(f"   {n_transforms} transforms: Failed ({str(e)[:30]}...)")
    
    # Fractal Coding parameter sensitivity
    print("\n🔄 Fractal Coding Parameter Sensitivity Analysis...")
    fc_results = {}
    
    block_sizes = [4, 6, 8, 10, 12]
    for block_size in block_sizes:
        try:
            compressor = FractalCodingCompressor(
                range_block_size=block_size, 
                domain_block_size=block_size*2
            )
            comp_result = compressor.compress(time_data, value_data)
            decomp_result = compressor.decompress(comp_result)
            
            correlation = metrics_calc.pearson_correlation(value_data, decomp_result.reconstructed_data)
            
            fc_results[block_size] = {
                'compression_ratio': comp_result.compression_ratio,
                'correlation': correlation,
                'time': comp_result.compression_time + decomp_result.decompression_time
            }
            
            print(f"   Block size {block_size}: ratio={comp_result.compression_ratio:.2f}, corr={correlation:.3f}")
            
        except Exception as e:
            print(f"   Block size {block_size}: Failed ({str(e)[:30]}...)")
    
    return ifs_results, fc_results

# Run parameter sensitivity analysis
ifs_param_results, fc_param_results = parameter_sensitivity_analysis()

In [None]:
# Plot parameter sensitivity results
fig, axes = plt.subplots(2, 3, figsize=(18, 10))

# IFS parameter plots
if ifs_param_results:
    params = list(ifs_param_results.keys())
    ratios = [ifs_param_results[p]['compression_ratio'] for p in params]
    correlations = [ifs_param_results[p]['correlation'] for p in params]
    times = [ifs_param_results[p]['time'] for p in params]
    
    axes[0,0].plot(params, ratios, 'bo-', linewidth=2, markersize=8)
    axes[0,0].set_xlabel('Number of Transformations')
    axes[0,0].set_ylabel('Compression Ratio')
    axes[0,0].set_title('IFS: Compression Ratio vs Parameters', fontweight='bold')
    axes[0,0].grid(True, alpha=0.3)
    
    axes[0,1].plot(params, correlations, 'ro-', linewidth=2, markersize=8)
    axes[0,1].set_xlabel('Number of Transformations')
    axes[0,1].set_ylabel('Correlation')
    axes[0,1].set_title('IFS: Quality vs Parameters', fontweight='bold')
    axes[0,1].grid(True, alpha=0.3)
    
    axes[0,2].plot(params, times, 'go-', linewidth=2, markersize=8)
    axes[0,2].set_xlabel('Number of Transformations')
    axes[0,2].set_ylabel('Total Time (s)')
    axes[0,2].set_title('IFS: Processing Time vs Parameters', fontweight='bold')
    axes[0,2].grid(True, alpha=0.3)

# Fractal Coding parameter plots
if fc_param_results:
    params = list(fc_param_results.keys())
    ratios = [fc_param_results[p]['compression_ratio'] for p in params]
    correlations = [fc_param_results[p]['correlation'] for p in params]
    times = [fc_param_results[p]['time'] for p in params]
    
    axes[1,0].plot(params, ratios, 'bo-', linewidth=2, markersize=8)
    axes[1,0].set_xlabel('Range Block Size')
    axes[1,0].set_ylabel('Compression Ratio')
    axes[1,0].set_title('Fractal Coding: Compression Ratio vs Block Size', fontweight='bold')
    axes[1,0].grid(True, alpha=0.3)
    
    axes[1,1].plot(params, correlations, 'ro-', linewidth=2, markersize=8)
    axes[1,1].set_xlabel('Range Block Size')
    axes[1,1].set_ylabel('Correlation')
    axes[1,1].set_title('Fractal Coding: Quality vs Block Size', fontweight='bold')
    axes[1,1].grid(True, alpha=0.3)
    
    axes[1,2].plot(params, times, 'go-', linewidth=2, markersize=8)
    axes[1,2].set_xlabel('Range Block Size')
    axes[1,2].set_ylabel('Total Time (s)')
    axes[1,2].set_title('Fractal Coding: Processing Time vs Block Size', fontweight='bold')
    axes[1,2].grid(True, alpha=0.3)

plt.suptitle('Parameter Sensitivity Analysis', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

## 9. Conclusions and Summary

Let's summarize our findings and provide recommendations.

In [None]:
# Generate final summary and recommendations
def generate_final_summary(results_df, fractal_df):
    """Generate comprehensive summary and recommendations."""
    
    print("🎯 FRACTAL TIME SERIES COMPRESSION - FINAL SUMMARY")
    print("=" * 70)
    
    if not results_df.empty:
        # Overall performance
        print("\n📊 OVERALL PERFORMANCE:")
        print("-" * 30)
        
        overall_stats = results_df.groupby('Method').agg({
            'Compression Ratio': ['mean', 'std', 'min', 'max'],
            'Correlation': ['mean', 'std', 'min', 'max'],
            'Total Time (s)': ['mean', 'std']
        }).round(3)
        
        for method in results_df['Method'].unique():
            method_data = results_df[results_df['Method'] == method]
            
            print(f"\n{method}:")
            print(f"  Compression Ratio: {method_data['Compression Ratio'].mean():.2f} ± {method_data['Compression Ratio'].std():.2f}")
            print(f"  Correlation: {method_data['Correlation'].mean():.3f} ± {method_data['Correlation'].std():.3f}")
            print(f"  Processing Time: {method_data['Total Time (s)'].mean():.3f} ± {method_data['Total Time (s)'].std():.3f} seconds")
            print(f"  Best Dataset: {method_data.loc[method_data['Correlation'].idxmax(), 'Dataset']}")
            print(f"  Worst Dataset: {method_data.loc[method_data['Correlation'].idxmin(), 'Dataset']}")
        
        # Method comparison
        print("\n🏆 METHOD COMPARISON:")
        print("-" * 25)
        
        best_compression = results_df.loc[results_df['Compression Ratio'].idxmax()]
        best_quality = results_df.loc[results_df['Correlation'].idxmax()]
        fastest = results_df.loc[results_df['Total Time (s)'].idxmin()]
        
        print(f"Best Compression: {best_compression['Method']} on {best_compression['Dataset']} ({best_compression['Compression Ratio']:.2f}x)")
        print(f"Best Quality: {best_quality['Method']} on {best_quality['Dataset']} (r={best_quality['Correlation']:.4f})")
        print(f"Fastest: {fastest['Method']} on {fastest['Dataset']} ({fastest['Total Time (s)']:.3f}s)")
        
        # Dataset analysis
        print("\n📈 DATASET ANALYSIS:")
        print("-" * 20)
        
        dataset_difficulty = results_df.groupby('Dataset').agg({
            'Correlation': 'mean',
            'Compression Ratio': 'mean'
        }).sort_values('Correlation', ascending=False)
        
        print("Dataset Ranking (by average reconstruction quality):")
        for i, (dataset, row) in enumerate(dataset_difficulty.iterrows(), 1):
            print(f"  {i}. {dataset}: Avg Correlation = {row['Correlation']:.3f}, Avg Ratio = {row['Compression Ratio']:.2f}x")
        
        print("\n💡 RECOMMENDATIONS:")
        print("-" * 18)
        
        # Method-specific recommendations
        if 'IFS' in results_df['Method'].values:
            ifs_data = results_df[results_df['Method'] == 'IFS']
            print(f"\n🔄 IFS Compression:")
            print(f"  - Best for: {ifs_data.loc[ifs_data['Correlation'].idxmax(), 'Dataset']} datasets")
            print(f"  - Average compression: {ifs_data['Compression Ratio'].mean():.2f}x")
            print(f"  - Use when: Signal has global self-similarity")
            print(f"  - Avoid when: Signal is very noisy or has local variations")
        
        if 'Fractal Coding' in results_df['Method'].values:
            fc_data = results_df[results_df['Method'] == 'Fractal Coding']
            print(f"\n🔄 Fractal Coding:")
            print(f"  - Best for: {fc_data.loc[fc_data['Correlation'].idxmax(), 'Dataset']} datasets")
            print(f"  - Average compression: {fc_data['Compression Ratio'].mean():.2f}x")
            print(f"  - Use when: Signal has local self-similarity")
            print(f"  - Avoid when: Signal is completely random")
        
        print("\n🎯 GENERAL GUIDELINES:")
        print("  1. For smooth, periodic signals: Use IFS with 3-4 transformations")
        print("  2. For complex, multi-scale signals: Use Fractal Coding with block size 8-12")
        print("  3. For real-time applications: Consider processing time trade-offs")
        print("  4. For high-fidelity reconstruction: Combine methods with Fractal Interpolation")
        
        # Fractal properties summary
        if fractal_df is not None and not fractal_df.empty:
            print("\n🌀 FRACTAL PROPERTIES PRESERVATION:")
            print("-" * 40)
            
            fractal_summary = fractal_df.groupby('Method').agg({
                'FD Error': 'mean',
                'Hurst Error': 'mean'
            })
            
            for method, row in fractal_summary.iterrows():
                print(f"  {method}:")
                print(f"    Fractal Dimension Error: {row['FD Error']:.4f}")
                print(f"    Hurst Exponent Error: {row['Hurst Error']:.4f}")
        
        print("\n⚠️  LIMITATIONS:")
        print("-" * 15)
        print("  • Compression ratios lower than specialized algorithms (GZIP, LZ77)")
        print("  • Higher computational cost for optimization")
        print("  • Performance depends on signal's fractal properties")
        print("  • Parameter tuning required for optimal results")
        
        print("\n🚀 FUTURE WORK:")
        print("-" * 15)
        print("  • Adaptive parameter selection")
        print("  • Hybrid compression methods")
        print("  • GPU acceleration for optimization")
        print("  • Real-world dataset evaluation")
        
    else:
        print("❌ No results available for summary")
    
    print("\n" + "=" * 70)
    print("🎉 Analysis Complete! Thank you for exploring fractal compression!")
    print("=" * 70)

# Generate final summary
generate_final_summary(results_df, fractal_df if 'fractal_df' in locals() else None)

## 10. Save Results

Finally, let's save our results for future reference.

In [None]:
# Save results to files
import os
from datetime import datetime

# Create results directory
results_dir = 'notebook_results'
os.makedirs(results_dir, exist_ok=True)

# Save results DataFrame
if not results_df.empty:
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    # Save to CSV
    csv_path = os.path.join(results_dir, f'compression_results_{timestamp}.csv')
    results_df.to_csv(csv_path, index=False)
    print(f"✅ Results saved to: {csv_path}")
    
    # Save fractal results if available
    if 'fractal_df' in locals() and fractal_df is not None:
        fractal_csv_path = os.path.join(results_dir, f'fractal_results_{timestamp}.csv')
        fractal_df.to_csv(fractal_csv_path, index=False)
        print(f"✅ Fractal results saved to: {fractal_csv_path}")
    
    # Save parameter sensitivity results
    if ifs_param_results:
        import json
        param_path = os.path.join(results_dir, f'parameter_sensitivity_{timestamp}.json')
        with open(param_path, 'w') as f:
            json.dump({
                'ifs_parameters': ifs_param_results,
                'fractal_coding_parameters': fc_param_results
            }, f, indent=2)
        print(f"✅ Parameter sensitivity results saved to: {param_path}")
    
    print(f"\n📁 All results saved in directory: {results_dir}/")
    print("\n🎊 Notebook execution completed successfully!")
    
else:
    print("❌ No results to save")