In [None]:
!pip install statsmodels

# Gaussian Copula Demo

This notebook demonstrates:
1. **Why Copulas Matter** - Preserving correlation structure
2. **Multi-Column Fitting** - Fit distributions to multiple columns
3. **Copula Fitting** - Capture correlation via Spark ML (scales to billions)
4. **Correlated Sampling** - Local and distributed sampling
5. **Serialization** - Save and load copulas
6. **Performance Benchmark** - spark-bestfit vs statsmodels at scale

In [None]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

from spark_bestfit import DistributionFitter, GaussianCopula

In [None]:
# Create Spark session
spark = SparkSession.builder \
    .master("local[8]") \
    .appName("CopulaDemo") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

## 1. Why Copulas Matter: The Correlation Problem

When you fit distributions to columns independently, the correlation between columns is lost. Let's demonstrate this problem.

In [None]:
# Generate correlated data: price and quantity with negative correlation
# (higher price -> lower quantity, like demand curves)
np.random.seed(42)
n_samples = 10_000

# Create correlated normal samples
correlation = -0.7  # Strong negative correlation
cov_matrix = [[1.0, correlation], [correlation, 1.0]]
correlated_normals = np.random.multivariate_normal([0, 0], cov_matrix, n_samples)

# Transform to different distributions (price: lognormal, quantity: gamma)
from scipy import stats as st
price = st.lognorm.ppf(st.norm.cdf(correlated_normals[:, 0]), s=0.5, scale=100)
quantity = st.gamma.ppf(st.norm.cdf(correlated_normals[:, 1]), a=5, scale=20)

# Create DataFrame
pdf = pd.DataFrame({"price": price, "quantity": quantity})
df = spark.createDataFrame(pdf)

# Show the original correlation
original_corr = pdf.corr(method="spearman").iloc[0, 1]
print(f"Original Spearman correlation: {original_corr:.3f}")
print(f"\nSample statistics:")
print(pdf.describe().round(2))

## 2. The Problem: Independent Sampling Loses Correlation

If we fit each column independently and sample separately, correlation is lost.

In [None]:
# Fit distributions independently
fitter = DistributionFitter(spark, random_seed=42)
results = fitter.fit(df, columns=["price", "quantity"], max_distributions=10)

# Get best fits
best_price = results.for_column("price").best(n=1)[0]
best_quantity = results.for_column("quantity").best(n=1)[0]

print(f"Best fit for price: {best_price.distribution}")
print(f"Best fit for quantity: {best_quantity.distribution}")

In [None]:
# Sample independently - correlation is LOST!
independent_price = best_price.sample(size=10_000, random_state=42)
independent_quantity = best_quantity.sample(size=10_000, random_state=42)

independent_pdf = pd.DataFrame({
    "price": independent_price,
    "quantity": independent_quantity
})

independent_corr = independent_pdf.corr(method="spearman").iloc[0, 1]
print(f"Original correlation:    {original_corr:.3f}")
print(f"Independent correlation: {independent_corr:.3f}  <- LOST!")

## 3. The Solution: Gaussian Copula

A Gaussian copula preserves both:
- **Marginal distributions**: Each column follows its fitted distribution
- **Correlation structure**: Columns maintain their original relationships

**Big Data Advantage**: Unlike standard libraries that require `.toPandas()`, spark-bestfit computes correlation via Spark ML - scaling to billions of rows.

In [None]:
# Fit the copula - correlation computed via Spark ML (no .toPandas()!)
copula = GaussianCopula.fit(results, df)

print(f"Columns: {copula.column_names}")
print(f"\nCorrelation matrix (computed via Spark ML):")
print(pd.DataFrame(
    copula.correlation_matrix,
    index=copula.column_names,
    columns=copula.column_names
).round(3))

In [None]:
# Sample with correlation preserved!
copula_samples = copula.sample(n=10_000, random_state=42)
copula_pdf = pd.DataFrame(copula_samples)

copula_corr = copula_pdf.corr(method="spearman").iloc[0, 1]
print(f"Original correlation:    {original_corr:.3f}")
print(f"Independent correlation: {independent_corr:.3f}  <- Lost")
print(f"Copula correlation:      {copula_corr:.3f}  <- Preserved!")

## 4. Distributed Sampling with `sample_spark()`

For large-scale sampling (millions of correlated samples), use `sample_spark()` to leverage Spark's distributed computing.

In [None]:
# Generate 100,000 correlated samples using Spark
samples_df = copula.sample_spark(n=100_000, random_seed=42)

print(f"Schema: {samples_df.schema}")
print(f"\nSample preview:")
samples_df.show(5)

In [None]:
# Verify correlation is preserved at scale
spark_samples_pdf = samples_df.toPandas()
spark_corr = spark_samples_pdf.corr(method="spearman").iloc[0, 1]

print(f"Original correlation:     {original_corr:.3f}")
print(f"Spark sample correlation: {spark_corr:.3f}")
print(f"Difference:               {abs(original_corr - spark_corr):.3f}")

## 5. Verifying Marginal Distributions

The copula preserves marginal distributions - each column still follows its fitted distribution.

In [None]:
# K-S test: verify samples match the fitted marginal distributions
for col in copula.column_names:
    marginal = copula.marginals[col]
    dist = marginal.get_scipy_dist()
    samples = copula_pdf[col].values
    
    # K-S test against fitted distribution
    ks_stat, p_value = st.kstest(samples, lambda x: dist.cdf(x, *marginal.parameters))
    
    print(f"{col}:")
    print(f"  Distribution: {marginal.distribution}")
    print(f"  K-S statistic: {ks_stat:.4f}")
    print(f"  P-value: {p_value:.4f} {'(good fit!)' if p_value > 0.05 else '(poor fit)'}\n")

## 6. Serialization: Save and Load Copulas

Save fitted copulas for later use - great for production pipelines.

In [None]:
import tempfile
import os

# Save to JSON (human-readable, recommended)
with tempfile.TemporaryDirectory() as tmpdir:
    json_path = os.path.join(tmpdir, "copula.json")
    copula.save(json_path)
    
    # Show the JSON contents
    with open(json_path, "r") as f:
        import json
        data = json.load(f)
        print("JSON structure:")
        print(f"  schema_version: {data['schema_version']}")
        print(f"  type: {data['type']}")
        print(f"  columns: {data['column_names']}")
        print(f"  marginals: {list(data['marginals'].keys())}")
    
    # Load and verify
    loaded = GaussianCopula.load(json_path)
    loaded_samples = loaded.sample(n=1000, random_state=42)
    print(f"\nLoaded copula works! Generated {len(loaded_samples['price'])} samples.")

## 7. Three-Column Example

Copulas work with any number of columns (minimum 2).

In [None]:
# Generate correlated data: price, quantity, revenue
np.random.seed(123)
n = 10_000

# Create correlation structure
corr_3d = [
    [1.0,  -0.6, 0.3],   # price: neg corr with quantity, pos with revenue
    [-0.6, 1.0,  0.5],   # quantity: neg corr with price, pos with revenue
    [0.3,  0.5,  1.0],   # revenue: pos corr with both
]
correlated_normals_3d = np.random.multivariate_normal([0, 0, 0], corr_3d, n)

# Transform to different distributions
price_3d = st.lognorm.ppf(st.norm.cdf(correlated_normals_3d[:, 0]), s=0.3, scale=50)
quantity_3d = st.gamma.ppf(st.norm.cdf(correlated_normals_3d[:, 1]), a=10, scale=5)
revenue_3d = st.norm.ppf(st.norm.cdf(correlated_normals_3d[:, 2]), loc=1000, scale=200)

pdf_3d = pd.DataFrame({
    "price": price_3d,
    "quantity": quantity_3d,
    "revenue": revenue_3d
})
df_3d = spark.createDataFrame(pdf_3d)

print("Original correlation matrix:")
print(pdf_3d.corr(method="spearman").round(3))

In [None]:
# Fit distributions and copula
results_3d = fitter.fit(df_3d, columns=["price", "quantity", "revenue"], max_distributions=10)
copula_3d = GaussianCopula.fit(results_3d, df_3d)

# Sample and compare correlations
samples_3d = copula_3d.sample(n=10_000, random_state=42)
samples_3d_pdf = pd.DataFrame(samples_3d)

print("Copula correlation matrix (should match original):")
print(samples_3d_pdf.corr(method="spearman").round(3))

## 8. Performance Benchmark: When to Use spark-bestfit

**Important context**: spark-bestfit is NOT faster than statsmodels for small data on a single machine. The value is **scale** - handling data that doesn't fit in memory and generating samples across a cluster.

| Scenario | Recommendation |
|----------|----------------|
| Data fits in memory (< 10M rows) | statsmodels is faster |
| Data exceeds memory (100M+ rows) | spark-bestfit (only option) |
| Data already in Spark | spark-bestfit (avoid `.toPandas()`) |
| Need 100M+ samples | `sample_spark()` (distributed) |

> **Note**: This section requires `statsmodels` for comparison: `pip install statsmodels`

In [None]:
import time
from statsmodels.distributions.copula.api import GaussianCopula as StatsmodelsGaussianCopula
from scipy.stats import norm, gamma, lognorm
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

def benchmark_correlation_only(n_rows: int, n_runs: int = 3):
    """Benchmark ONLY correlation computation: pandas vs Spark ML."""
    
    # Generate test data with VALID correlation matrix
    np.random.seed(42)
    # Valid positive-semidefinite correlation matrix
    corr_matrix = [[1.0, 0.6, 0.3], [0.6, 1.0, 0.4], [0.3, 0.4, 1.0]]
    data = np.random.multivariate_normal([0, 0, 0], corr_matrix, n_rows)
    
    pdf_data = pd.DataFrame({"col1": data[:, 0], "col2": data[:, 1], "col3": data[:, 2]})
    spark_df = spark.createDataFrame(pdf_data)
    
    # Benchmark pandas (what statsmodels uses internally)
    pandas_times = []
    for _ in range(n_runs):
        start = time.time()
        corr = pdf_data.corr(method="spearman").values
        pandas_times.append(time.time() - start)
    
    # Benchmark Spark ML correlation ONLY (no distribution fitting)
    spark_times = []
    for _ in range(n_runs):
        start = time.time()
        assembler = VectorAssembler(inputCols=["col1", "col2", "col3"], outputCol="features", handleInvalid="skip")
        vector_df = assembler.transform(spark_df).select("features")
        corr_result = Correlation.corr(vector_df, "features", method="spearman")
        _ = corr_result.head()[0].toArray()  # Force execution
        spark_times.append(time.time() - start)
    
    return {
        "n_rows": n_rows,
        "pandas_ms": np.mean(pandas_times) * 1000,
        "spark_ml_ms": np.mean(spark_times) * 1000,
    }

print("Correlation-Only Benchmark (apples-to-apples)")
print("=" * 70)
print(f"{'N Rows':>12} | {'pandas (ms)':>14} | {'Spark ML (ms)':>14} | {'Notes':>22}")
print("-" * 70)

for n in [10_000, 100_000, 1_000_000]:
    result = benchmark_correlation_only(n, n_runs=2)
    ratio = result["spark_ml_ms"] / result["pandas_ms"]
    notes = f"Spark {ratio:.0f}x slower (overhead)"
    print(f"{result['n_rows']:>12,} | {result['pandas_ms']:>14.1f} | {result['spark_ml_ms']:>14.1f} | {notes:>22}")

print("\n** Spark overhead is expected for local mode. The value is MEMORY scale, not speed. **")

In [None]:
def benchmark_sampling(n_samples: int, n_runs: int = 3):
    """Benchmark sample generation: statsmodels vs spark-bestfit."""
    
    # Use the copula we already fitted (copula_3d from section 7)
    corr_matrix_np = copula_3d.correlation_matrix
    
    # Create statsmodels copula with same correlation
    sm_copula = StatsmodelsGaussianCopula(corr=corr_matrix_np, k_dim=3)
    
    # Benchmark statsmodels sampling (single-node, returns uniform only)
    sm_times = []
    for _ in range(n_runs):
        start = time.time()
        sm_samples = sm_copula.rvs(n_samples)
        sm_times.append(time.time() - start)
    
    # Benchmark spark-bestfit local sampling (includes marginal transform)
    local_times = []
    for _ in range(n_runs):
        start = time.time()
        samples = copula_3d.sample(n=n_samples, random_state=42)
        local_times.append(time.time() - start)
    
    # Benchmark spark-bestfit distributed sampling
    spark_times = []
    for _ in range(n_runs):
        start = time.time()
        samples_df = copula_3d.sample_spark(n=n_samples, random_seed=42)
        _ = samples_df.count()  # Force execution
        spark_times.append(time.time() - start)
    
    return {
        "n_samples": n_samples,
        "statsmodels_ms": np.mean(sm_times) * 1000,
        "local_ms": np.mean(local_times) * 1000,
        "spark_ms": np.mean(spark_times) * 1000,
    }

print("\nSampling Benchmark")
print("=" * 90)
print(f"{'N Samples':>12} | {'statsmodels (ms)':>16} | {'local (ms)':>12} | {'spark (ms)':>12} | {'Fastest':>14}")
print("-" * 90)

for n in [1_000_000, 10_000_000, 50_000_000]:
    result = benchmark_sampling(n, n_runs=2)
    times = [("statsmodels", result["statsmodels_ms"]), 
             ("local", result["local_ms"]), 
             ("spark", result["spark_ms"])]
    winner = min(times, key=lambda x: x[1])[0]
    print(f"{result['n_samples']:>12,} | {result['statsmodels_ms']:>16.1f} | {result['local_ms']:>12.1f} | {result['spark_ms']:>12.1f} | {winner:>14}")

print("\n** Note: statsmodels returns uniform samples only; spark-bestfit includes marginal transform. **")
print("** At 100M+ samples, single-node approaches may exhaust memory - spark_spark() scales. **")

### Benchmark Takeaways

**Correlation Computation:**
- pandas/statsmodels is **faster** for data that fits in memory (expected!)
- Spark ML has overhead but can handle **billions of rows** that would crash pandas

**Sampling:**
- Local methods (statsmodels, `sample()`) are faster for small-medium samples
- `sample_spark()` distributes work across the cluster for massive scale

**When spark-bestfit copula makes sense:**
1. **Data already in Spark** - avoid expensive `.toPandas()` to compute correlation
2. **Data exceeds memory** - 100M+ rows won't fit in pandas, Spark is the only option
3. **Massive sample generation** - 100M+ correlated samples for Monte Carlo simulation
4. **Production pipelines** - integrate with existing Spark ETL workflows

**When to use statsmodels instead:**
- Data fits comfortably in memory (< 10M rows)
- You're not already using Spark
- You need fastest possible local performance

## Summary

The `GaussianCopula` class enables correlated multi-column sampling **at scale**:

| Scenario | statsmodels | spark-bestfit |
|----------|-------------|---------------|
| Data < 10M rows | **Faster** (use this) | Slower (Spark overhead) |
| Data > 100M rows | Crashes (OOM) | **Works** (distributed) |
| Data in Spark | Requires `.toPandas()` | **Native** (no conversion) |
| 100M+ samples | May OOM | **`sample_spark()`** distributed |

**Use spark-bestfit copula when:**
- Data is already in a Spark DataFrame
- Data exceeds single-node memory (100M+ rows)
- You need 100M+ correlated samples for simulation
- You're building production Spark pipelines

**Key Methods:**
- `GaussianCopula.fit(results, df)` - Fit copula from multi-column results
- `copula.sample(n)` - Local sampling for small scale
- `copula.sample_spark(n)` - Distributed sampling for massive scale
- `copula.save(path)` / `GaussianCopula.load(path)` - Serialization

In [None]:
# Cleanup
spark.stop()