In [None]:
#!pip install numpy pandas torch matplotlib seaborn scikit-learn

In [3]:
!pip install numpy
!pip install pandas
!pip install torch
!pip install matplotlib
!pip install seaborn
!pip install scikit-learn

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [4]:
# Core libraries
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')

# Set style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

# Check GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cpu


In [5]:
# Load your volatility index data
data = pd.read_csv('DayByDay96_Volatility 10 Index.csv')

print("Dataset Shape:", data.shape)
print(f"Days: {data.shape[0]}, Intraday measurements: {data.shape[1]}")
print(f"\nData range: [{data.values.min():.2f}, {data.values.max():.2f}]")
print(f"Mean volatility: {data.values.mean():.2f}")
print(f"Std deviation: {data.values.std():.2f}")

FileNotFoundError: [Errno 2] No such file or directory: 'DayByDay96_Volatility 10 Index.csv'

In [None]:
# Visualize sample data
fig, axes = plt.subplots(2, 1, figsize=(15, 8))

# Plot first 5 days as lines
for i in range(5):
    axes[0].plot(data.iloc[i].values, label=f'Day {i+1}', alpha=0.7)
axes[0].set_title('Intraday Volatility Patterns (First 5 Days)')
axes[0].set_xlabel('Intraday Time Point')
axes[0].set_ylabel('Volatility Index')
axes[0].legend()

# Heatmap of first 30 days
sns.heatmap(data.iloc[:30], cmap='viridis', ax=axes[1], cbar_kws={'label': 'Volatility'})
axes[1].set_title('Volatility Heatmap (First 30 Days)')
axes[1].set_xlabel('Intraday Time Point')
axes[1].set_ylabel('Day')

plt.tight_layout()
plt.show()

In [None]:
def detect_periods(x, top_k=5):
    """
    Detect dominant periods using FFT - core to TimesNet
    
    Args:
        x: Time series tensor [batch, length]
        top_k: Number of top periods to return
    """
    # Flatten the data for FFT analysis
    x_flat = x.reshape(-1)
    
    # Apply FFT
    fft = np.fft.rfft(x_flat)
    frequencies = np.fft.rfftfreq(len(x_flat))
    
    # Get amplitude spectrum
    amplitudes = np.abs(fft)
    
    # Exclude DC component and find peaks
    amplitudes[0] = 0
    
    # Get top k frequencies
    top_freq_idx = np.argsort(amplitudes)[-top_k:][::-1]
    top_frequencies = frequencies[top_freq_idx]
    
    # Convert frequencies to periods
    periods = []
    for freq in top_frequencies:
        if freq > 0:
            period = int(1 / freq)
            if period > 1 and period < len(x_flat) // 2:
                periods.append(period)
    
    return sorted(list(set(periods)))[:top_k]

# Detect periods in volatility data
vol_array = data.values
detected_periods = detect_periods(vol_array, top_k=5)
print(f"Detected dominant periods: {detected_periods}")
print(f"Primary period: {detected_periods[0]} (likely intraday cycle)")
print(f"Secondary period: {detected_periods[1] if len(detected_periods) > 1 else 'N/A'}")

In [None]:
class InceptionBlock(nn.Module):
    """Inception module for multi-scale 2D convolutions"""
    def __init__(self, in_channels, out_channels):
        super().__init__()
        
        # Multiple kernel sizes for multi-scale patterns
        self.branch1x1 = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        
        self.branch3x3 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        )
        
        self.branch5x5 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1),
            nn.Conv2d(out_channels, out_channels, kernel_size=5, padding=2)
        )
        
        self.branch_pool = nn.Sequential(
            nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
            nn.Conv2d(in_channels, out_channels, kernel_size=1)
        )
        
    def forward(self, x):
        branch1x1 = self.branch1x1(x)
        branch3x3 = self.branch3x3(x)
        branch5x5 = self.branch5x5(x)
        branch_pool = self.branch_pool(x)
        
        outputs = [branch1x1, branch3x3, branch5x5, branch_pool]
        return torch.cat(outputs, dim=1)


class TimesBlock(nn.Module):
    """
    TimesBlock: Core component that transforms 1D to 2D and applies Inception
    """
    def __init__(self, input_dim, hidden_dim, num_periods=5):
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.num_periods = num_periods
        
        # Inception for 2D convolutions
        self.inception = InceptionBlock(1, hidden_dim // 4)
        
        # Adaptive weights for different periods
        self.period_weights = nn.Parameter(torch.ones(num_periods))
        
        # Output projection
        self.output_proj = nn.Linear(hidden_dim, input_dim)
        
        self.norm = nn.LayerNorm(input_dim)
        self.dropout = nn.Dropout(0.1)
        
    def forward(self, x, periods):
        """
        x: [batch, length, dim]
        periods: list of detected periods
        """
        batch_size, seq_len, dim = x.shape
        residual = x
        
        # Process each period
        period_outputs = []
        for i, period in enumerate(periods[:self.num_periods]):
            # Reshape to 2D based on period
            if seq_len % period == 0:
                height = seq_len // period
                reshaped = x.reshape(batch_size, height, period, dim)
                reshaped = reshaped.mean(dim=-1, keepdim=True)  # [B, H, W, 1]
                reshaped = reshaped.permute(0, 3, 1, 2)  # [B, 1, H, W]
                
                # Apply Inception
                conv_out = self.inception(reshaped)  # [B, hidden_dim, H, W]
                
                # Reshape back
                conv_out = conv_out.mean(dim=(2, 3))  # [B, hidden_dim]
                period_outputs.append(conv_out)
        
        # Adaptive aggregation
        if period_outputs:
            weights = F.softmax(self.period_weights[:len(period_outputs)], dim=0)
            aggregated = sum(w * out for w, out in zip(weights, period_outputs))
            
            # Project back to original dimension
            aggregated = aggregated.unsqueeze(1).expand(-1, seq_len, -1)
            output = self.output_proj(aggregated)
            
            # Residual connection
            output = self.norm(output + residual)
            output = self.dropout(output)
            return output
        else:

In [None]:
class TimesNet(nn.Module):
    """
    Complete TimesNet model for multiple tasks
    """
    def __init__(self, 
                 input_dim=96,
                 hidden_dim=256,
                 num_blocks=3,
                 num_periods=5,
                 task='forecast',  # 'forecast', 'classification', 'both'
                 num_classes=3,
                 forecast_horizon=96):
        super().__init__()
        
        self.task = task
        self.input_dim = input_dim
        self.forecast_horizon = forecast_horizon
        
        # Input embedding
        self.input_projection = nn.Linear(input_dim, hidden_dim)
        
        # Stack of TimesBlocks
        self.blocks = nn.ModuleList([
            TimesBlock(hidden_dim, hidden_dim * 2, num_periods)
            for _ in range(num_blocks)
        ])
        
        # Task-specific heads
        if task in ['forecast', 'both']:
            self.forecast_head = nn.Sequential(
                nn.Linear(hidden_dim, hidden_dim * 2),
                nn.ReLU(),
                nn.Dropout(0.1),
                nn.Linear(hidden_dim * 2, forecast_horizon)
            )
        
        if task in ['classification', 'both']:
            self.classification_head = nn.Sequential(
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.1),
                nn.Linear(hidden_dim, num_classes)
            )
    
    def detect_periods_batch(self, x):
        """Detect periods for batch"""
        # Simplified: use pre-detected periods
        return [96, 192, 480, 5, 10]  # Common volatility periods
    
    def forward(self, x):
        """
        x: [batch, seq_len, input_dim] for historical data
        Returns: forecast and/or classification based on task
        """
        batch_size = x.shape[0]
        
        # Detect periods
        periods = self.detect_periods_batch(x)
        
        # Input projection
        x = self.input_projection(x)
        
        # Apply TimesBlocks
        for block in self.blocks:
            x = block(x, periods)
        
        outputs = {}
        
        # Forecasting output
        if self.task in ['forecast', 'both']:
            # Use last hidden state for forecasting
            forecast_input = x[:, -1, :]  # [batch, hidden_dim]
            forecast = self.forecast_head(forecast_input)
            outputs['forecast'] = forecast
        
        # Classification output
        if self.task in ['classification', 'both']:
            # Global pooling for classification
            class_input = x.mean(dim=1)  # [batch, hidden_dim]
            classification = self.classification_head(class_input)
            outputs['classification'] = classification
        
        return outputs
