# Bimodal Distribution Validation for Stock Diffusion Model

## Objective
Test whether the diffusion model can capture **multimodal distributions** rather than collapsing to mean prediction.

## Experiment Design
1. Create synthetic data with **identical input conditions** but **conflicting targets**
   - Same 120-second history pattern
   - Target A: +5% return (bullish outcome)
   - Target B: -5% return (bearish outcome)
2. Train the model on this conflicting data
3. At inference, sample 500 times from the same condition
4. Analyze the output distribution

## Success Criteria
- **Pass**: Output shows bimodal distribution (peaks at both +5% and -5%)
- **Fail**: Output shows unimodal distribution at 0% (mean collapse)

In [None]:
import sys
from pathlib import Path

# Add project root to path
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from tqdm import tqdm

from src.models.model import StockDiffusionModel, ModelConfig

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Device setup
if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")

## 1. Create Synthetic Bimodal Dataset

We create a dataset where:
- All samples have the **exact same** input condition (120-step sequence)
- Half the samples have target = +0.05 (5% up)
- Half the samples have target = -0.05 (5% down)

This creates a perfectly bimodal target distribution that the model must learn.

In [None]:
class BimodalDataset(Dataset):
    """
    Synthetic dataset with identical conditions but bimodal targets.
    
    All samples share the same input sequence, but targets are either
    +target_value or -target_value with equal probability.
    """
    
    def __init__(
        self,
        n_samples: int = 1000,
        seq_length: int = 120,
        n_features: int = 4,
        target_value: float = 0.05,  # 5% return
        condition_pattern: str = "sine"  # Pattern for the condition
    ):
        self.n_samples = n_samples
        self.seq_length = seq_length
        self.n_features = n_features
        self.target_value = target_value
        
        # Create a fixed condition pattern (same for all samples)
        self.condition = self._create_condition(condition_pattern)
        
        # Create bimodal targets: half +target, half -target
        self.targets = np.zeros(n_samples, dtype=np.float32)
        self.targets[:n_samples // 2] = target_value   # Bullish
        self.targets[n_samples // 2:] = -target_value  # Bearish
        
        # Shuffle to mix bullish and bearish samples
        np.random.shuffle(self.targets)
        
        print(f"Created bimodal dataset:")
        print(f"  Samples: {n_samples}")
        print(f"  Condition shape: ({seq_length}, {n_features})")
        print(f"  Targets: +{target_value:.2%} or {-target_value:.2%}")
        print(f"  Bullish samples: {(self.targets > 0).sum()}")
        print(f"  Bearish samples: {(self.targets < 0).sum()}")
    
    def _create_condition(self, pattern: str) -> np.ndarray:
        """Create a deterministic condition pattern."""
        t = np.linspace(0, 4 * np.pi, self.seq_length)
        
        if pattern == "sine":
            # Sine wave pattern (simulates oscillating price)
            features = np.zeros((self.seq_length, self.n_features), dtype=np.float32)
            features[:, 0] = 0.01 * np.sin(t)  # log_return
            features[:, 1] = 0.005 + 0.002 * np.sin(2 * t)  # volatility
            features[:, 2] = 0.001 * np.ones(self.seq_length)  # spread
            features[:, 3] = np.sin(t / 2)  # volume_zscore
        elif pattern == "trend_up":
            # Upward trend
            features = np.zeros((self.seq_length, self.n_features), dtype=np.float32)
            features[:, 0] = 0.001 * np.ones(self.seq_length)  # small positive returns
            features[:, 1] = 0.005 * np.ones(self.seq_length)
            features[:, 2] = 0.001 * np.ones(self.seq_length)
            features[:, 3] = np.linspace(-1, 1, self.seq_length)
        elif pattern == "flat":
            # Flat/consolidation
            features = np.zeros((self.seq_length, self.n_features), dtype=np.float32)
            features[:, 0] = 0.0001 * np.random.randn(self.seq_length)  # near-zero returns
            features[:, 1] = 0.003 * np.ones(self.seq_length)  # low volatility
            features[:, 2] = 0.0005 * np.ones(self.seq_length)
            features[:, 3] = np.zeros(self.seq_length)
        else:
            raise ValueError(f"Unknown pattern: {pattern}")
        
        return features
    
    def __len__(self) -> int:
        return self.n_samples
    
    def __getitem__(self, idx: int) -> dict:
        return {
            'input': torch.from_numpy(self.condition.copy()),  # Same condition for all
            'target': torch.tensor([self.targets[idx]], dtype=torch.float32)
        }
    
    def get_condition_tensor(self) -> torch.Tensor:
        """Get the shared condition as a tensor for inference."""
        return torch.from_numpy(self.condition.copy())

In [None]:
# Create dataset
N_SAMPLES = 2000  # Total training samples
SEQ_LENGTH = 120
N_FEATURES = 4
TARGET_VALUE = 0.05  # 5% return magnitude

dataset = BimodalDataset(
    n_samples=N_SAMPLES,
    seq_length=SEQ_LENGTH,
    n_features=N_FEATURES,
    target_value=TARGET_VALUE,
    condition_pattern="sine"
)

# Visualize the condition and target distribution
fig, axes = plt.subplots(1, 2, figsize=(14, 4))

# Plot condition pattern
ax = axes[0]
condition = dataset.condition
feature_names = ['log_return', 'volatility', 'spread', 'volume_zscore']
for i in range(N_FEATURES):
    ax.plot(condition[:, i], label=feature_names[i], alpha=0.8)
ax.set_xlabel('Time (seconds)')
ax.set_ylabel('Feature Value')
ax.set_title('Shared Input Condition (Same for All Samples)')
ax.legend()
ax.grid(True, alpha=0.3)

# Plot target distribution
ax = axes[1]
ax.hist(dataset.targets * 100, bins=50, edgecolor='black', alpha=0.7)
ax.axvline(x=TARGET_VALUE * 100, color='green', linestyle='--', linewidth=2, label=f'+{TARGET_VALUE:.0%}')
ax.axvline(x=-TARGET_VALUE * 100, color='red', linestyle='--', linewidth=2, label=f'-{TARGET_VALUE:.0%}')
ax.set_xlabel('Target Return (%)')
ax.set_ylabel('Count')
ax.set_title('Target Distribution (Ground Truth: Bimodal)')
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nTarget statistics:")
print(f"  Mean: {dataset.targets.mean():.6f} (should be ~0)")
print(f"  Std:  {dataset.targets.std():.6f}")
print(f"  Min:  {dataset.targets.min():.6f}")
print(f"  Max:  {dataset.targets.max():.6f}")

## 2. Create and Train the Diffusion Model

We use a smaller model configuration for faster training on this simple task.

In [None]:
# Model configuration (smaller for quick experiment)
model_config = ModelConfig(
    encoder_type="transformer",
    input_features=N_FEATURES,
    seq_length=SEQ_LENGTH,
    encoder_dim=32,       # Smaller encoder
    encoder_layers=2,
    encoder_heads=4,
    diffusion_steps=500,  # Full diffusion steps
    noise_schedule="cosine",
    prediction_type="epsilon",
    denoiser_type="mlp",
    denoising_hidden_dim=64,  # Smaller denoiser
    denoising_blocks=3,
    time_embedding_dim=32,
    dropout=0.1
)

model = StockDiffusionModel(model_config)
model = model.to(device)

print(f"Model parameters: {model.get_num_params():,}")

In [None]:
def train_model(
    model: nn.Module,
    dataset: Dataset,
    epochs: int = 100,
    batch_size: int = 64,
    learning_rate: float = 1e-3,
    device: torch.device = device
) -> list:
    """Train the diffusion model on the bimodal dataset."""
    
    dataloader = DataLoader(
        dataset, 
        batch_size=batch_size, 
        shuffle=True,
        drop_last=True
    )
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
        optimizer, T_max=epochs, eta_min=learning_rate / 10
    )
    
    losses = []
    model.train()
    
    pbar = tqdm(range(epochs), desc="Training")
    for epoch in pbar:
        epoch_loss = 0.0
        n_batches = 0
        
        for batch in dataloader:
            x_seq = batch['input'].to(device)
            target = batch['target'].to(device)
            
            optimizer.zero_grad()
            outputs = model(x_seq, target)
            loss = outputs['loss']
            loss.backward()
            
            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            
            optimizer.step()
            
            epoch_loss += loss.item()
            n_batches += 1
        
        scheduler.step()
        avg_loss = epoch_loss / n_batches
        losses.append(avg_loss)
        
        pbar.set_postfix({'loss': f'{avg_loss:.6f}', 'lr': f'{scheduler.get_last_lr()[0]:.2e}'})
    
    return losses

In [None]:
# Train the model
EPOCHS = 200  # Enough epochs to converge on this simple task
BATCH_SIZE = 64
LEARNING_RATE = 1e-3

print("Starting training...")
print(f"Epochs: {EPOCHS}, Batch size: {BATCH_SIZE}, LR: {LEARNING_RATE}")
print()

losses = train_model(
    model, 
    dataset, 
    epochs=EPOCHS, 
    batch_size=BATCH_SIZE,
    learning_rate=LEARNING_RATE
)

# Plot training loss
plt.figure(figsize=(10, 4))
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.grid(True, alpha=0.3)
plt.yscale('log')
plt.show()

print(f"\nFinal loss: {losses[-1]:.6f}")

## 3. Sample from the Trained Model

Now we sample 500 times from the model given the **same condition** and analyze the output distribution.

In [None]:
@torch.no_grad()
def sample_predictions(
    model: nn.Module,
    condition: torch.Tensor,
    n_samples: int = 500,
    use_ddim: bool = True,
    ddim_steps: int = 50,
    device: torch.device = device
) -> np.ndarray:
    """
    Sample multiple predictions from the diffusion model.
    
    Args:
        model: Trained diffusion model
        condition: Input condition tensor (seq_length, n_features)
        n_samples: Number of samples to generate
        use_ddim: Use DDIM sampling (faster)
        ddim_steps: Number of DDIM steps if use_ddim=True
        
    Returns:
        Array of sampled predictions (n_samples,)
    """
    model.eval()
    
    # Expand condition for batch sampling
    condition = condition.unsqueeze(0).to(device)  # (1, seq_len, features)
    condition = condition.expand(n_samples, -1, -1)  # (n_samples, seq_len, features)
    
    # Sample using the model's sample method
    samples = model.sample(
        condition,
        num_samples=1,  # Already batched
        use_ddim=use_ddim,
        ddim_steps=ddim_steps
    )
    
    return samples.cpu().numpy().flatten()

In [None]:
# Sample from the trained model
N_INFERENCE_SAMPLES = 500

print(f"Sampling {N_INFERENCE_SAMPLES} predictions from the model...")

# Get the shared condition
condition = dataset.get_condition_tensor()

# Sample predictions
predictions = sample_predictions(
    model,
    condition,
    n_samples=N_INFERENCE_SAMPLES,
    use_ddim=True,
    ddim_steps=50
)

print(f"\nSampled {len(predictions)} predictions")
print(f"Prediction statistics:")
print(f"  Mean: {predictions.mean():.6f}")
print(f"  Std:  {predictions.std():.6f}")
print(f"  Min:  {predictions.min():.6f}")
print(f"  Max:  {predictions.max():.6f}")

## 4. Analyze Results: Is the Distribution Bimodal?

We now compare the model's output distribution to the ground truth bimodal distribution.

In [None]:
def analyze_bimodality(predictions: np.ndarray, target_value: float, threshold: float = 0.02):
    """
    Analyze if the predictions show bimodal behavior.
    
    Args:
        predictions: Array of model predictions
        target_value: Expected peak locations (+/- target_value)
        threshold: Threshold for classifying predictions as bullish/bearish
        
    Returns:
        dict with analysis results
    """
    # Classify predictions
    bullish = predictions > threshold
    bearish = predictions < -threshold
    neutral = ~bullish & ~bearish
    
    n_total = len(predictions)
    n_bullish = bullish.sum()
    n_bearish = bearish.sum()
    n_neutral = neutral.sum()
    
    # Calculate bimodality score
    # A perfect bimodal distribution would have ~50% bullish, ~50% bearish, 0% neutral
    bimodal_score = min(n_bullish, n_bearish) / max(n_bullish, n_bearish, 1)
    spread_score = 1 - (n_neutral / n_total)  # Higher is better (less neutral)
    
    # Mean absolute deviation from target peaks
    bullish_deviation = np.abs(predictions[bullish] - target_value).mean() if n_bullish > 0 else float('inf')
    bearish_deviation = np.abs(predictions[bearish] - (-target_value)).mean() if n_bearish > 0 else float('inf')
    
    results = {
        'n_bullish': n_bullish,
        'n_bearish': n_bearish,
        'n_neutral': n_neutral,
        'pct_bullish': n_bullish / n_total * 100,
        'pct_bearish': n_bearish / n_total * 100,
        'pct_neutral': n_neutral / n_total * 100,
        'bimodal_score': bimodal_score,
        'spread_score': spread_score,
        'bullish_deviation': bullish_deviation,
        'bearish_deviation': bearish_deviation,
        'mean': predictions.mean(),
        'std': predictions.std()
    }
    
    return results

In [None]:
# Analyze bimodality
results = analyze_bimodality(predictions, TARGET_VALUE, threshold=TARGET_VALUE * 0.3)

print("=" * 60)
print("BIMODALITY ANALYSIS RESULTS")
print("=" * 60)
print(f"\nPrediction distribution:")
print(f"  Bullish (> {TARGET_VALUE * 0.3:.1%}):  {results['n_bullish']:4d} ({results['pct_bullish']:.1f}%)")
print(f"  Bearish (< {-TARGET_VALUE * 0.3:.1%}): {results['n_bearish']:4d} ({results['pct_bearish']:.1f}%)")
print(f"  Neutral (middle):       {results['n_neutral']:4d} ({results['pct_neutral']:.1f}%)")
print(f"\nBimodality metrics:")
print(f"  Balance score: {results['bimodal_score']:.3f} (1.0 = perfectly balanced)")
print(f"  Spread score:  {results['spread_score']:.3f} (1.0 = no neutral predictions)")
print(f"\nPeak accuracy:")
print(f"  Bullish peak deviation: {results['bullish_deviation']:.6f}")
print(f"  Bearish peak deviation: {results['bearish_deviation']:.6f}")
print(f"\nOverall statistics:")
print(f"  Mean: {results['mean']:.6f} (should be ~0 for balanced bimodal)")
print(f"  Std:  {results['std']:.6f} (should be ~{TARGET_VALUE:.4f} for bimodal)")

In [None]:
# Visualize the results
fig, axes = plt.subplots(1, 3, figsize=(16, 5))

# 1. Histogram comparison
ax = axes[0]
ax.hist(dataset.targets * 100, bins=50, alpha=0.5, label='Ground Truth', color='blue', density=True)
ax.hist(predictions * 100, bins=50, alpha=0.5, label='Model Predictions', color='orange', density=True)
ax.axvline(x=TARGET_VALUE * 100, color='green', linestyle='--', linewidth=2, alpha=0.7)
ax.axvline(x=-TARGET_VALUE * 100, color='red', linestyle='--', linewidth=2, alpha=0.7)
ax.axvline(x=0, color='gray', linestyle=':', linewidth=2, alpha=0.7, label='Mean (MSE collapse)')
ax.set_xlabel('Return (%)')
ax.set_ylabel('Density')
ax.set_title('Distribution Comparison')
ax.legend()
ax.grid(True, alpha=0.3)

# 2. Model predictions only (detailed)
ax = axes[1]
ax.hist(predictions * 100, bins=50, edgecolor='black', alpha=0.7, color='orange')
ax.axvline(x=TARGET_VALUE * 100, color='green', linestyle='--', linewidth=2, label=f'Expected +{TARGET_VALUE:.0%}')
ax.axvline(x=-TARGET_VALUE * 100, color='red', linestyle='--', linewidth=2, label=f'Expected -{TARGET_VALUE:.0%}')
ax.axvline(x=0, color='gray', linestyle=':', linewidth=2, label='Mean = 0')
ax.set_xlabel('Predicted Return (%)')
ax.set_ylabel('Count')
ax.set_title(f'Model Output Distribution (n={len(predictions)})')
ax.legend()
ax.grid(True, alpha=0.3)

# 3. Scatter plot of predictions
ax = axes[2]
ax.scatter(range(len(predictions)), predictions * 100, alpha=0.5, s=10)
ax.axhline(y=TARGET_VALUE * 100, color='green', linestyle='--', linewidth=2, label=f'+{TARGET_VALUE:.0%}')
ax.axhline(y=-TARGET_VALUE * 100, color='red', linestyle='--', linewidth=2, label=f'-{TARGET_VALUE:.0%}')
ax.axhline(y=0, color='gray', linestyle=':', linewidth=2)
ax.set_xlabel('Sample Index')
ax.set_ylabel('Predicted Return (%)')
ax.set_title('Individual Predictions')
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Final verdict
print("=" * 60)
print("FINAL VERDICT")
print("=" * 60)

# Criteria for success:
# 1. Both bullish and bearish predictions exist (>20% each)
# 2. Neutral predictions are minority (<30%)
# 3. Std is close to target_value (not collapsed to 0)

is_bimodal = (
    results['pct_bullish'] > 20 and 
    results['pct_bearish'] > 20 and
    results['pct_neutral'] < 30 and
    results['std'] > TARGET_VALUE * 0.5
)

is_collapsed = (
    results['pct_neutral'] > 70 or
    results['std'] < TARGET_VALUE * 0.3
)

if is_bimodal:
    print("\n[PASS] The model has learned a BIMODAL distribution!")
    print("\nThe diffusion model correctly captures that the same input")
    print("condition can lead to different outcomes (bullish OR bearish).")
    print("This demonstrates the model's ability to learn probabilistic")
    print("distributions rather than just point estimates.")
elif is_collapsed:
    print("\n[FAIL] The model has COLLAPSED to mean prediction!")
    print("\nThe model outputs are clustered around the mean (0),")
    print("indicating it has degenerated into an MSE-like regression.")
    print("This means the diffusion process is not working correctly.")
    print("\nPossible causes:")
    print("  - Insufficient training")
    print("  - Model architecture issues")
    print("  - Noise schedule problems")
else:
    print("\n[PARTIAL] Results are inconclusive.")
    print("\nThe model shows some bimodal behavior but not clearly.")
    print("Consider training longer or adjusting hyperparameters.")

print("\n" + "=" * 60)

## 5. Additional Experiments

Let's run a few more experiments to validate the model's behavior under different conditions.

In [None]:
# Experiment: Compare DDPM vs DDIM sampling
print("Comparing DDPM vs DDIM sampling...")

# DDIM sampling (fast)
predictions_ddim = sample_predictions(
    model, condition, n_samples=300, use_ddim=True, ddim_steps=50
)

# DDPM sampling (slow but more accurate)
predictions_ddpm = sample_predictions(
    model, condition, n_samples=300, use_ddim=False
)

fig, axes = plt.subplots(1, 2, figsize=(12, 4))

ax = axes[0]
ax.hist(predictions_ddim * 100, bins=30, alpha=0.7, label='DDIM (50 steps)')
ax.axvline(x=TARGET_VALUE * 100, color='green', linestyle='--', linewidth=2)
ax.axvline(x=-TARGET_VALUE * 100, color='red', linestyle='--', linewidth=2)
ax.set_title(f'DDIM Sampling (std={predictions_ddim.std():.4f})')
ax.set_xlabel('Return (%)')
ax.legend()
ax.grid(True, alpha=0.3)

ax = axes[1]
ax.hist(predictions_ddpm * 100, bins=30, alpha=0.7, label='DDPM (500 steps)', color='orange')
ax.axvline(x=TARGET_VALUE * 100, color='green', linestyle='--', linewidth=2)
ax.axvline(x=-TARGET_VALUE * 100, color='red', linestyle='--', linewidth=2)
ax.set_title(f'DDPM Sampling (std={predictions_ddpm.std():.4f})')
ax.set_xlabel('Return (%)')
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Experiment: Different target separations
print("\nTesting model on different target separations...")
print("(Using same trained model with different synthetic targets)\n")

# Test how well the model generalizes
# Note: The model was trained on ±5%, let's see if it can handle ±3% or ±7%

# Create datasets with different separations (for comparison only)
separations = [0.03, 0.05, 0.07]
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

for ax, sep in zip(axes, separations):
    test_dataset = BimodalDataset(
        n_samples=100,
        seq_length=SEQ_LENGTH,
        n_features=N_FEATURES,
        target_value=sep,
        condition_pattern="sine"
    )
    
    ax.hist(test_dataset.targets * 100, bins=30, alpha=0.7, label='Ground Truth')
    ax.axvline(x=sep * 100, color='green', linestyle='--', linewidth=2)
    ax.axvline(x=-sep * 100, color='red', linestyle='--', linewidth=2)
    ax.set_title(f'Target: ±{sep:.0%}')
    ax.set_xlabel('Return (%)')
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\nNote: The model was trained on ±5% targets.")
print("For different separations, the model would need to be retrained.")

## 6. Summary

### What This Experiment Tests

This experiment tests the core capability of diffusion models: **learning multimodal probability distributions**.

### Key Insights

1. **If PASS (Bimodal)**: The diffusion model correctly learns that:
   - The same market condition can lead to different outcomes
   - The model captures uncertainty through the distribution shape
   - This is fundamentally different from MSE regression

2. **If FAIL (Collapsed)**: The model has degenerated to:
   - Predicting the mean of all possible outcomes
   - Lost the ability to represent multiple modes
   - Behaves like a standard regression model

### Implications for Stock Prediction

Real stock markets often have multimodal return distributions:
- Same technical pattern can lead to breakout OR breakdown
- Uncertainty around key events (earnings, FOMC)
- Regime changes in market conditions

A diffusion model that passes this test has the potential to:
- Provide realistic uncertainty estimates
- Identify scenarios with high outcome ambiguity
- Support better risk management decisions

In [None]:
# Save results for later analysis
import json

experiment_results = {
    'config': {
        'n_samples': N_SAMPLES,
        'seq_length': SEQ_LENGTH,
        'n_features': N_FEATURES,
        'target_value': TARGET_VALUE,
        'epochs': EPOCHS,
        'batch_size': BATCH_SIZE,
        'learning_rate': LEARNING_RATE,
        'inference_samples': N_INFERENCE_SAMPLES
    },
    'results': {
        'pct_bullish': float(results['pct_bullish']),
        'pct_bearish': float(results['pct_bearish']),
        'pct_neutral': float(results['pct_neutral']),
        'bimodal_score': float(results['bimodal_score']),
        'spread_score': float(results['spread_score']),
        'mean': float(results['mean']),
        'std': float(results['std'])
    },
    'final_loss': float(losses[-1]),
    'is_bimodal': bool(is_bimodal),
    'is_collapsed': bool(is_collapsed)
}

# Save to file
output_path = project_root / 'outputs' / 'bimodal_experiment_results.json'
output_path.parent.mkdir(parents=True, exist_ok=True)

with open(output_path, 'w') as f:
    json.dump(experiment_results, f, indent=2)

print(f"Results saved to: {output_path}")
print("\nExperiment complete!")