# 🧪 Meme Stock Deep Learning - Local Test Version

## 📋 Overview
- **Purpose**: Test on MacBook with small data subset
- **Models**: Lightweight versions of MLP, LSTM, Transformer
- **Data**: 100 samples for quick testing
- **Time**: ~2-5 minutes total

## 1️⃣ Setup

In [6]:
#!/usr/bin/env python3
"""
🧪 Local Test Version - Small data for MacBook testing
"""

import pandas as pd
import numpy as np
import json
import warnings
from datetime import datetime
import time
warnings.filterwarnings('ignore')

# ML libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Sklearn
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from scipy.stats import spearmanr

print("✅ Libraries imported")
print(f"PyTorch version: {torch.__version__}")
print(f"Device: {'MPS' if torch.backends.mps.is_available() else 'CPU'}")

# Use MPS (Metal) on Mac if available
if torch.backends.mps.is_available():
    device = torch.device("mps")
    print("🎯 Using Apple Metal GPU")
else:
    device = torch.device("cpu")
    print("💻 Using CPU")

✅ Libraries imported
PyTorch version: 2.2.2
Device: CPU
💻 Using CPU


## 2️⃣ Create Small Test Data

In [None]:
def create_test_data(n_samples=100, n_features=47, seq_length=20, seq_features=49):
    """Create small synthetic test data"""
    
    print(f"🔧 Creating test data: {n_samples} samples")
    
    # Set seed for reproducibility
    np.random.seed(42)
    
    # Try to load real data first (with proper error handling)
    try:
        import glob
        import os
        
        # Check multiple possible paths
        paths_to_check = [
            'data/colab_datasets/',
            '../data/colab_datasets/',
            './data/colab_datasets/',
            './'
        ]
        
        sequences_file = None
        for path in paths_to_check:
            files = glob.glob(os.path.join(path, 'sequences_*.npz'))
            if files:
                sequences_file = files[0]
                break
        
        if sequences_file:
            print(f"📁 Found sequences file: {sequences_file}")
            # Load with allow_pickle=True to handle object arrays
            sequences_data = np.load(sequences_file, allow_pickle=True)
            print("✅ Loaded real sequence data (with allow_pickle=True)")
            
            # Extract first 100 samples for testing
            first_ticker = list(sequences_data.keys())[0].replace('_sequences', '')
            if f'{first_ticker}_sequences' in sequences_data:
                real_sequences = sequences_data[f'{first_ticker}_sequences']
                
                # Handle object dtype
                if real_sequences.dtype == object:
                    print("⚠️ Converting object dtype to float32...")
                    # Find numeric columns
                    numeric_cols = []
                    for i in range(real_sequences.shape[2]):
                        try:
                            test_col = real_sequences[:, :, i].astype(np.float32)
                            if np.any(np.isfinite(test_col)):
                                numeric_cols.append(i)
                        except:
                            continue
                    
                    if numeric_cols:
                        real_sequences = real_sequences[:, :, numeric_cols].astype(np.float32)
                        seq_features = len(numeric_cols)
                        print(f"✅ Using {seq_features} numeric features from real data")
                
                # Use real data dimensions
                if len(real_sequences) >= n_samples:
                    X_sequence = real_sequences[:n_samples].astype(np.float32)
                    seq_length = X_sequence.shape[1]
                    seq_features = X_sequence.shape[2]
                    print(f"✅ Using real data: ({n_samples}, {seq_length}, {seq_features})")
                else:
                    raise ValueError("Not enough samples in real data")
            else:
                raise ValueError("No sequence data in file")
    except Exception as e:
        print(f"⚠️ Could not load real data: {e}")
        print("📊 Using synthetic data instead...")
        X_sequence = None
    
    # If real data not loaded, use synthetic
    if X_sequence is None:
        # Tabular data
        X_tabular = np.random.randn(n_samples, n_features).astype(np.float32)
        
        # Sequence data
        X_sequence = np.random.randn(n_samples, seq_length, seq_features).astype(np.float32)
    else:
        # Create matching tabular data
        X_tabular = np.random.randn(n_samples, n_features).astype(np.float32)
    
    # Clean any NaN/Inf values
    X_tabular = np.nan_to_num(X_tabular, nan=0.0, posinf=0.0, neginf=0.0)
    X_sequence = np.nan_to_num(X_sequence, nan=0.0, posinf=0.0, neginf=0.0)
    
    # Targets (with some pattern)
    y = (
        0.1 * X_tabular[:, 0] + 
        0.05 * X_tabular[:, 1] + 
        0.02 * np.random.randn(n_samples)
    ).astype(np.float32)
    
    # Split data
    train_size = int(0.6 * n_samples)
    val_size = int(0.2 * n_samples)
    
    # Tabular splits
    X_train_tab = X_tabular[:train_size]
    X_val_tab = X_tabular[train_size:train_size+val_size]
    X_test_tab = X_tabular[train_size+val_size:]
    
    # Sequence splits
    X_train_seq = X_sequence[:train_size]
    X_val_seq = X_sequence[train_size:train_size+val_size]
    X_test_seq = X_sequence[train_size+val_size:]
    
    # Target splits
    y_train = y[:train_size]
    y_val = y[train_size:train_size+val_size]
    y_test = y[train_size+val_size:]
    
    print(f"✅ Test data created:")
    print(f"   Train: {len(y_train)} samples")
    print(f"   Val: {len(y_val)} samples")
    print(f"   Test: {len(y_test)} samples")
    print(f"   Sequence shape: ({X_train_seq.shape[0]}, {X_train_seq.shape[1]}, {X_train_seq.shape[2]})")
    
    return (
        (X_train_tab, X_val_tab, X_test_tab),
        (X_train_seq, X_val_seq, X_test_seq),
        (y_train, y_val, y_test)
    )

# Create test data
tabular_data, sequence_data, targets = create_test_data(n_samples=100)
X_train_tab, X_val_tab, X_test_tab = tabular_data
X_train_seq, X_val_seq, X_test_seq = sequence_data
y_train, y_val, y_test = targets

## 3️⃣ Lightweight Model Definitions

In [None]:
class SimpleMLP(nn.Module):
    """Lightweight MLP for testing"""
    
    def __init__(self, input_dim, hidden_dims=[64, 32]):
        super(SimpleMLP, self).__init__()
        
        layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2)
            ])
            prev_dim = hidden_dim
        
        layers.append(nn.Linear(prev_dim, 1))
        self.network = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.network(x)


class SimpleLSTM(nn.Module):
    """Lightweight LSTM for testing"""
    
    def __init__(self, input_size, hidden_size=32, num_layers=1):
        super(SimpleLSTM, self).__init__()
        
        self.lstm = nn.LSTM(
            input_size, hidden_size, num_layers,
            batch_first=True
        )
        
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 16),
            nn.ReLU(),
            nn.Linear(16, 1)
        )
    
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        last_output = lstm_out[:, -1, :]
        return self.fc(last_output)


class SimpleTransformer(nn.Module):
    """Lightweight Transformer for testing"""
    
    def __init__(self, input_size, d_model=64, nhead=4, num_layers=2):
        super(SimpleTransformer, self).__init__()
        
        self.input_projection = nn.Linear(input_size, d_model)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=d_model * 2,
            dropout=0.1,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        self.fc = nn.Linear(d_model, 1)
    
    def forward(self, x):
        x = self.input_projection(x)
        x = self.transformer(x)
        x = x.mean(dim=1)  # Global pooling
        return self.fc(x)

print("✅ Model architectures defined")

## 4️⃣ Quick Training Function

In [None]:
def quick_train(model, X_train, y_train, X_val, y_val, 
                epochs=20, batch_size=16, lr=0.001, model_name="Model"):
    """Quick training for testing"""
    
    print(f"\n🎯 Training {model_name}...")
    start_time = time.time()
    
    model = model.to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # Create data loaders
    train_dataset = TensorDataset(
        torch.FloatTensor(X_train),
        torch.FloatTensor(y_train)
    )
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    # Training loop
    train_losses = []
    
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_X).squeeze()
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
        
        avg_loss = epoch_loss / len(train_loader)
        train_losses.append(avg_loss)
        
        if epoch % 5 == 0:
            print(f"  Epoch {epoch:2d}: Loss={avg_loss:.4f}")
    
    # Validation
    model.eval()
    with torch.no_grad():
        X_val_tensor = torch.FloatTensor(X_val).to(device)
        val_pred = model(X_val_tensor).cpu().numpy().flatten()
    
    # Calculate metrics
    val_corr, _ = spearmanr(y_val, val_pred)
    val_rmse = np.sqrt(mean_squared_error(y_val, val_pred))
    
    elapsed = time.time() - start_time
    print(f"✅ {model_name} completed in {elapsed:.1f}s")
    print(f"   Val Correlation: {val_corr:.4f}")
    print(f"   Val RMSE: {val_rmse:.4f}")
    
    return model, train_losses, val_corr

## 5️⃣ Train Models

In [None]:
# Train MLP
mlp_model = SimpleMLP(X_train_tab.shape[1])
mlp_model, mlp_losses, mlp_corr = quick_train(
    mlp_model, X_train_tab, y_train, X_val_tab, y_val,
    epochs=20, model_name="MLP"
)

In [None]:
# Train LSTM
lstm_model = SimpleLSTM(X_train_seq.shape[2])
lstm_model, lstm_losses, lstm_corr = quick_train(
    lstm_model, X_train_seq, y_train, X_val_seq, y_val,
    epochs=20, model_name="LSTM"
)

In [None]:
# Train Transformer
transformer_model = SimpleTransformer(X_train_seq.shape[2])
transformer_model, trans_losses, trans_corr = quick_train(
    transformer_model, X_train_seq, y_train, X_val_seq, y_val,
    epochs=20, model_name="Transformer"
)

## 6️⃣ Test Evaluation

In [None]:
def prepare_sequence_data_fixed(sequences_data, metadata):
    """Prepare sequence data with robust error handling"""
    
    print("🔄 Preparing sequence data...")
    
    all_sequences = []
    all_targets = []
    all_dates = []
    min_features = None  # Track minimum feature count
    
    # First pass: determine minimum common features
    cleaned_sequences_dict = {}
    
    # Process each ticker
    for ticker in metadata['tickers']:
        seq_key = f'{ticker}_sequences'
        
        if seq_key in sequences_data:
            sequences = sequences_data[seq_key]
            targets = sequences_data[f'{ticker}_targets_1d']
            dates = sequences_data[f'{ticker}_dates']
            
            # Handle object dtype (string columns)
            if sequences.dtype == object:
                print(f"   ⚠️ {ticker}: Cleaning object dtype...")
                
                # Find numeric columns only
                numeric_cols = []
                for i in range(sequences.shape[2]):
                    try:
                        test_col = sequences[:, :, i].astype(np.float32)
                        if np.any(np.isfinite(test_col)):
                            numeric_cols.append(i)
                    except:
                        continue
                
                if numeric_cols:
                    sequences = sequences[:, :, numeric_cols].astype(np.float32)
                else:
                    print(f"   ❌ {ticker}: No numeric columns, skipping")
                    continue
            else:
                sequences = sequences.astype(np.float32)
            
            # Clean NaN/Inf
            sequences = np.nan_to_num(sequences, nan=0.0, posinf=0.0, neginf=0.0)
            
            # Store cleaned sequences
            cleaned_sequences_dict[ticker] = {
                'sequences': sequences,
                'targets': targets,
                'dates': dates
            }
            
            # Track minimum features
            n_features = sequences.shape[2]
            if min_features is None or n_features < min_features:
                min_features = n_features
            
            print(f"   ✅ {ticker}: {sequences.shape}")
    
    if not cleaned_sequences_dict:
        print("❌ No valid sequences found")
        return None, None, None
    
    print(f"\n📊 Standardizing to {min_features} common features...")
    
    # Second pass: standardize all sequences to same feature count
    for ticker, data in cleaned_sequences_dict.items():
        sequences = data['sequences']
        
        # If this ticker has more features, truncate to min_features
        if sequences.shape[2] > min_features:
            sequences = sequences[:, :, :min_features]
            print(f"   📏 {ticker}: Truncated to {min_features} features")
        
        all_sequences.append(sequences)
        all_targets.extend(data['targets'])
        all_dates.extend(data['dates'])
    
    # Stack all sequences (now they all have same dimensions)
    X_seq = np.vstack(all_sequences).astype(np.float32)
    y_seq = np.array(all_targets, dtype=np.float32)
    
    print(f"\n✅ Sequence data prepared:")
    print(f"   X_seq: {X_seq.shape}")
    print(f"   y_seq: {y_seq.shape}")
    print(f"   Features: {min_features} (standardized)")
    
    # Split by date
    dates_array = np.array([pd.to_datetime(d) for d in all_dates])
    
    train_end = pd.to_datetime('2023-02-02')
    val_end = pd.to_datetime('2023-07-15')
    
    train_mask = dates_array <= train_end
    val_mask = (dates_array > train_end) & (dates_array <= val_end)
    test_mask = dates_array > val_end
    
    X_train_seq = X_seq[train_mask]
    X_val_seq = X_seq[val_mask]
    X_test_seq = X_seq[test_mask]
    
    y_train_seq = y_seq[train_mask]
    y_val_seq = y_seq[val_mask]
    y_test_seq = y_seq[test_mask]
    
    print(f"\n📊 Sequence data split:")
    print(f"   Train: {X_train_seq.shape}")
    print(f"   Val: {X_val_seq.shape}")
    print(f"   Test: {X_test_seq.shape}")
    
    return (X_train_seq, X_val_seq, X_test_seq,
            y_train_seq, y_val_seq, y_test_seq)

# Test real data loading and sequence preparation
def test_real_data_loading():
    """Test loading real data files"""
    
    print("\n🔍 Testing real data loading...")
    
    try:
        # Try to load metadata
        import glob
        metadata_files = glob.glob('../data/colab_datasets/*metadata*.json')
        
        if metadata_files:
            with open(metadata_files[0], 'r') as f:
                metadata = json.load(f)
            
            timestamp = metadata['timestamp']
            print(f"✅ Found data with timestamp: {timestamp}")
            
            # Try loading CSVs
            train_df = pd.read_csv(f'../data/colab_datasets/tabular_train_{timestamp}.csv', nrows=50)
            print(f"✅ Loaded train data: {train_df.shape}")
            
            # Try loading NPZ with allow_pickle=True
            sequences = np.load(f'../data/colab_datasets/sequences_{timestamp}.npz', allow_pickle=True)
            print(f"✅ Loaded sequences with allow_pickle=True")
            
            # Test the fixed sequence preparation
            try:
                seq_data = prepare_sequence_data_fixed(sequences, metadata)
                if seq_data[0] is not None:
                    print("✅ Sequence preparation successful!")
                    return True
            except Exception as e:
                print(f"⚠️ Sequence preparation failed: {e}")
            
            return True
        else:
            print("⚠️ No metadata file found")
            return False
            
    except Exception as e:
        print(f"❌ Error loading real data: {e}")
        return False

# Test real data loading
real_data_ok = test_real_data_loading()

## 7️⃣ Load Real Data Test

In [None]:
def test_real_data_loading():
    """Test loading real data files"""
    
    print("\n🔍 Testing real data loading...")
    
    try:
        # Try to load metadata
        import glob
        metadata_files = glob.glob('../data/colab_datasets/*metadata*.json')
        
        if metadata_files:
            with open(metadata_files[0], 'r') as f:
                metadata = json.load(f)
            
            timestamp = metadata['timestamp']
            print(f"✅ Found data with timestamp: {timestamp}")
            
            # Try loading CSVs
            train_df = pd.read_csv(f'../data/colab_datasets/tabular_train_{timestamp}.csv', nrows=50)
            print(f"✅ Loaded train data: {train_df.shape}")
            
            # Try loading NPZ
            sequences = np.load(f'../data/colab_datasets/sequences_{timestamp}.npz')
            print(f"✅ Loaded sequences: {list(sequences.keys())[:3]}...")
            
            return True
        else:
            print("⚠️ No metadata file found")
            return False
            
    except Exception as e:
        print(f"❌ Error loading real data: {e}")
        return False

# Test real data loading
real_data_ok = test_real_data_loading()

## 8️⃣ Summary

In [None]:
print("\n" + "="*50)
print("🎯 LOCAL TEST SUMMARY")
print("="*50)

print("\n✅ Models Tested:")
print("   - MLP: Working")
print("   - LSTM: Working")
print("   - Transformer: Working")

print("\n✅ Device:")
print(f"   - Using: {device}")
if device.type == "mps":
    print("   - Apple Metal GPU acceleration enabled")

print("\n✅ Data:")
print("   - Synthetic test data: Working")
if real_data_ok:
    print("   - Real data loading: Working")
else:
    print("   - Real data loading: Not available (expected on local)")

print("\n" + "="*50)
print("🚀 Ready for Colab A100 deployment!")
print("="*50)

# Save test results
test_summary = {
    'timestamp': datetime.now().strftime('%Y%m%d_%H%M%S'),
    'device': str(device),
    'models_tested': ['MLP', 'LSTM', 'Transformer'],
    'test_results': results,
    'real_data_available': real_data_ok,
    'status': 'PASSED'
}

with open('local_test_results.json', 'w') as f:
    json.dump(test_summary, f, indent=2, default=str)

print("\n✅ Test results saved to local_test_results.json")